From b873be65888d6607ec97b7ca9171e799aa6f9b6f Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Thu, 25 Apr 2024 17:24:41 +0800 Subject: [PATCH] [fix](routine-load) fix routine load lag is negative (#34113) * [fix](routine-load) fix routine load lag is negative (#33846) * fix merge error --- .../doris/load/routineload/KafkaProgress.java | 4 ++++ .../load/routineload/KafkaRoutineLoadJob.java | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 49542cd140..53c57a1cce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -85,6 +85,10 @@ public class KafkaProgress extends RoutineLoadProgress { return partitionIdToOffset.get(kafkaPartition); } + public Map getOffsetByPartition() { + return partitionIdToOffset; + } + public boolean containsPartition(Integer kafkaPartition) { return partitionIdToOffset.containsKey(kafkaPartition); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 8540bb4396..c00f16b7d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { return false; } + private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) { + ((KafkaProgress) attachment.getProgress()).getOffsetByPartition().entrySet().stream() + .forEach(entity -> { + if (cachedPartitionWithLatestOffsets.containsKey(entity.getKey()) + && cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) { + cachedPartitionWithLatestOffsets.put(entity.getKey(), entity.getValue() + 1); + } + }); + this.progress.update(attachment); + } + @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { super.updateProgress(attachment); - this.progress.update(attachment); + updateProgressAndOffsetsCache(attachment); } @Override protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { super.replayUpdateProgress(attachment); - this.progress.update(attachment); + updateProgressAndOffsetsCache(attachment); } @Override