diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 571ae7a2a1..d3c404c957 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1576,6 +1576,14 @@ public class Config extends ConfigBase { @ConfField public static int async_task_consumer_thread_num = 5; + /** + * When job is finished, it will be saved in job manager for a while. + * This configuration is used to control the max saved time. + * Default is 3 days. + */ + @ConfField + public static int finish_job_max_saved_second = 60 * 60 * 24 * 3; + // enable_workload_group should be immutable and temporarily set to mutable during the development test phase @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_workload_group = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java index ebfdfb04a2..6d39f1cd8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java @@ -151,6 +151,10 @@ public class Job implements Writable { return jobStatus == JobStatus.STOPPED; } + public boolean isFinished() { + return jobStatus == JobStatus.FINISHED; + } + public boolean isExpired(long nextExecuteTimestamp) { if (endTimeMs == 0L) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index ed094966e6..9c53d96443 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -18,6 +18,7 @@ package org.apache.doris.scheduler.manager; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.io.Writable; @@ -273,9 +274,9 @@ public class TimerJobManager implements Closeable, Writable { if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) { return; } + job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); cancelJobAllTask(job.getJobId()); job.setJobStatus(JobStatus.FINISHED); - jobMap.get(job.getJobId()).finish(); Env.getCurrentEnv().getEditLog().logUpdateJob(job); } @@ -379,6 +380,7 @@ public class TimerJobManager implements Closeable, Writable { log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis())); dorisTimer.newTimeout(timeout -> { batchSchedulerTasks(); + clearFinishJob(); cycleSystemSchedulerTasks(); }, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS); @@ -510,4 +512,23 @@ public class TimerJobManager implements Closeable, Writable { jobMap.putIfAbsent(job.getJobId(), job); } } + + /** + * clear finish job,if job finish time is more than @Config.finish_job_max_saved_second, we will delete it + * this method will be called every 10 minutes, therefore, the actual maximum + * deletion time is Config.finish_job_max_saved_second + 10 min. + * we could to delete job in time, but it's not make sense.start + */ + private void clearFinishJob() { + Long now = System.currentTimeMillis(); + jobMap.values().forEach(job -> { + if (job.isFinished() && now - job.getLatestCompleteExecuteTimeMs() > Config.finish_job_max_saved_second) { + jobMap.remove(job.getJobId()); + Env.getCurrentEnv().getEditLog().logDeleteJob(job); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); + log.debug("delete finish job:{}", job.getJobId()); + } + }); + + } }