From e0014308bddb739765139728441b7683afbf70d1 Mon Sep 17 00:00:00 2001 From: walter Date: Wed, 27 Dec 2023 15:17:04 +0800 Subject: [PATCH] [fix](load) Recycle progress before removing bulk load job (#29066) --- .../java/org/apache/doris/load/loadv2/BulkLoadJob.java | 5 +++++ .../java/org/apache/doris/load/loadv2/LoadManager.java | 9 +++++++++ .../org/apache/doris/load/loadv2/ProgressManager.java | 4 ++++ 3 files changed, 18 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 7a9f160b3f..1939e86f85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -336,6 +336,11 @@ public abstract class BulkLoadJob extends LoadJob { return userInfo; } + public void recycleProgress() { + // Recycle memory occupied by Progress. + Env.getCurrentProgressManager().removeProgress(String.valueOf(id)); + } + @Override protected void auditFinishedLoadJob() { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 72bc61239c..3909ff5d78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -421,6 +421,9 @@ public class LoadManager implements Writable { if (job instanceof SparkLoadJob) { ((SparkLoadJob) job).clearSparkLauncherLog(); } + if (job instanceof BulkLoadJob) { + ((BulkLoadJob) job).recycleProgress(); + } if (list.isEmpty()) { map.remove(job.getLabel()); } @@ -745,6 +748,9 @@ public class LoadManager implements Writable { if (!job.isCompleted()) { continue; } + if (job instanceof BulkLoadJob) { + ((BulkLoadJob) job).recycleProgress(); + } innerIter.remove(); idToLoadJob.remove(job.getId()); ++counter; @@ -765,6 +771,9 @@ public class LoadManager implements Writable { if (!job.isCompleted()) { continue; } + if (job instanceof BulkLoadJob) { + ((BulkLoadJob) job).recycleProgress(); + } iter.remove(); idToLoadJob.remove(job.getId()); ++counter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java index 2453af3788..25829d7c24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/ProgressManager.java @@ -45,6 +45,10 @@ public class ProgressManager { registerProgress(id, 0); } + public void removeProgress(String id) { + idToProgress.remove(id); + } + public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) { Progress progress = idToProgress.get(id); if (progress != null) {