From e56fe682c961f96668fb0b59a3411fc2147151fe Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Thu, 22 Feb 2024 21:59:02 +0800 Subject: [PATCH] fix total task exec time is far more than actual (#31279) --- .../apache/doris/load/routineload/RoutineLoadJob.java | 9 ++++----- .../doris/load/routineload/RoutineLoadJobTest.java | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 1a87328b60..a8fb467a9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -792,17 +792,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // if rate of error data is more than max_filter_ratio, pause job protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException { updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), - attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), - false /* not replay */); + attachment.getReceivedBytes(), false /* not replay */); } private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes, - long taskExecutionTime, boolean isReplay) throws UserException { + boolean isReplay) throws UserException { this.jobStatistic.totalRows += numOfTotalRows; this.jobStatistic.errorRows += numOfErrorRows; this.jobStatistic.unselectedRows += unselectedRows; this.jobStatistic.receivedBytes += receivedBytes; - this.jobStatistic.totalTaskExcutionTimeMs += taskExecutionTime; + this.jobStatistic.totalTaskExcutionTimeMs = System.currentTimeMillis() - createTimestamp; if (MetricRepo.isInit && !isReplay) { MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows); @@ -875,7 +874,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) { try { updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(), - attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(), true /* is replay */); + attachment.getReceivedBytes(), true /* is replay */); } catch (UserException e) { LOG.error("should not happen", e); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 7ac8820c92..df7f363381 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -288,7 +288,7 @@ public class RoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 0); Deencapsulation.setField(routineLoadJob, "maxBatchRows", 0); - Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, 1L, false); + Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 1L, 1L, 0L, 1L, false); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, Deencapsulation.getField(routineLoadJob, "state")); @@ -303,7 +303,7 @@ public class RoutineLoadJobTest { RoutineLoadStatistic jobStatistic = Deencapsulation.getField(routineLoadJob, "jobStatistic"); Deencapsulation.setField(jobStatistic, "currentErrorRows", 1); Deencapsulation.setField(jobStatistic, "currentTotalRows", 99); - Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, 1L, false); + Deencapsulation.invoke(routineLoadJob, "updateNumOfData", 2L, 0L, 0L, 1L, false); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, Deencapsulation.getField(routineLoadJob, "state")); Assert.assertEquals(new Long(0), Deencapsulation.getField(jobStatistic, "currentErrorRows"));