[fix](load) Recycle progress before removing bulk load job (#29066)

This commit is contained in:
walter
2023-12-27 15:17:04 +08:00
committed by GitHub
parent 578a7dadc1
commit e0014308bd
3 changed files with 18 additions and 0 deletions

View File

@ -336,6 +336,11 @@ public abstract class BulkLoadJob extends LoadJob {
return userInfo;
}
public void recycleProgress() {
// Recycle memory occupied by Progress.
Env.getCurrentProgressManager().removeProgress(String.valueOf(id));
}
@Override
protected void auditFinishedLoadJob() {
try {

View File

@ -421,6 +421,9 @@ public class LoadManager implements Writable {
if (job instanceof SparkLoadJob) {
((SparkLoadJob) job).clearSparkLauncherLog();
}
if (job instanceof BulkLoadJob) {
((BulkLoadJob) job).recycleProgress();
}
if (list.isEmpty()) {
map.remove(job.getLabel());
}
@ -745,6 +748,9 @@ public class LoadManager implements Writable {
if (!job.isCompleted()) {
continue;
}
if (job instanceof BulkLoadJob) {
((BulkLoadJob) job).recycleProgress();
}
innerIter.remove();
idToLoadJob.remove(job.getId());
++counter;
@ -765,6 +771,9 @@ public class LoadManager implements Writable {
if (!job.isCompleted()) {
continue;
}
if (job instanceof BulkLoadJob) {
((BulkLoadJob) job).recycleProgress();
}
iter.remove();
idToLoadJob.remove(job.getId());
++counter;

View File

@ -45,6 +45,10 @@ public class ProgressManager {
registerProgress(id, 0);
}
public void removeProgress(String id) {
idToProgress.remove(id);
}
public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) {
Progress progress = idToProgress.get(id);
if (progress != null) {