From 0e615a531e9e52f7ac2e4e7418b41aac314240c9 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sat, 7 Oct 2023 12:20:36 +0800 Subject: [PATCH] [Feature](Job)Job tasks support the choice of persistence or storage in memory (#24919) --- .../apache/doris/analysis/ShowJobTaskStmt.java | 18 ++++++++++-------- .../doris/scheduler/constants/JobCategory.java | 16 ++++++++++++---- .../doris/scheduler/disruptor/TaskHandler.java | 3 ++- .../scheduler/manager/JobTaskManager.java | 18 ++++++++++-------- .../scheduler/manager/TimerJobManager.java | 1 + .../scheduler/disruptor/TaskDisruptorTest.java | 2 ++ 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java index 74dca72fb7..118530341f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java @@ -58,7 +58,6 @@ public class ShowJobTaskStmt extends ShowStmt { @Getter private JobCategory jobCategory; // optional - private String jobCategoryName; // optional @Getter private String dbFullName; // optional @Getter @@ -66,7 +65,12 @@ public class ShowJobTaskStmt extends ShowStmt { public ShowJobTaskStmt(String category, LabelName labelName) { this.labelName = labelName; - this.jobCategoryName = category; + String jobCategoryName = category; + if (StringUtils.isBlank(jobCategoryName)) { + this.jobCategory = JobCategory.SQL; + } else { + this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase()); + } } @Override @@ -74,11 +78,6 @@ public class ShowJobTaskStmt extends ShowStmt { super.analyze(analyzer); CreateJobStmt.checkAuth(); checkLabelName(analyzer); - if (StringUtils.isBlank(jobCategoryName)) { - this.jobCategory = JobCategory.SQL; - } else { - this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase()); - } } private void checkLabelName(Analyzer analyzer) throws AnalysisException { @@ -113,6 +112,9 @@ public class ShowJobTaskStmt extends ShowStmt { @Override public RedirectStatus getRedirectStatus() { - return RedirectStatus.FORWARD_NO_SYNC; + if (jobCategory.isPersistent()) { + return RedirectStatus.FORWARD_NO_SYNC; + } + return RedirectStatus.NO_FORWARD; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java index 72a967625e..8578314721 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java @@ -23,9 +23,9 @@ import lombok.Getter; * The job category is used to distinguish different types of jobs. */ public enum JobCategory { - COMMON(1, "common"), - SQL(2, "sql"), - MTMV(3, "mtmv"), + COMMON(1, "common", true), + SQL(2, "sql", true), + MTMV(3, "mtmv", false), ; @Getter @@ -34,9 +34,17 @@ public enum JobCategory { @Getter private String name; - JobCategory(int code, String name) { + /** + * if the job is persistent, it will be saved to the metadata store. + * if the job is not persistent, it will not be saved to the memory. + */ + @Getter + private boolean persistent; + + JobCategory(int code, String name, boolean persistent) { this.code = code; this.name = name; + this.persistent = persistent; } public static JobCategory getJobCategoryByName(String name) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java index b9c7196e23..9daf3b7ee4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java @@ -139,7 +139,8 @@ public class TaskHandler implements WorkHandler { if (null == jobTaskManager) { jobTaskManager = Env.getCurrentEnv().getJobTaskManager(); } - jobTaskManager.addJobTask(jobTask); + boolean isPersistent = job.getJobCategory().isPersistent(); + jobTaskManager.addJobTask(jobTask, isPersistent); } public void onTransientTaskHandle(TaskEvent taskEvent) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java index 5117e2d417..7c739ba460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java @@ -70,15 +70,19 @@ public class JobTaskManager implements Writable { prepareTaskCreateMsMap.remove(jobId); } - public void addJobTask(JobTask jobTask) { + public void addJobTask(JobTask jobTask, boolean persist) { ConcurrentLinkedQueue jobTasks = jobTaskMap .computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>()); jobTasks.add(jobTask); if (jobTasks.size() > TASK_MAX_NUM) { JobTask oldTask = jobTasks.poll(); - Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask); + if (persist) { + Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask); + } + } + if (persist) { + Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask); } - Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask); } public List getJobTasks(Long jobId) { @@ -110,12 +114,10 @@ public class JobTaskManager implements Writable { public void deleteJobTasks(Long jobId) { ConcurrentLinkedQueue jobTasks = jobTaskMap.get(jobId); - if (jobTasks != null) { - JobTask jobTask = jobTasks.poll(); - log.info(new LogBuilder(LogKey.SCHEDULER_TASK, jobTask.getTaskId()) - .add("msg", "replay delete scheduler task").build()); + if (null != jobTasks) { + jobTaskMap.remove(jobId); } - jobTaskMap.remove(jobId); + clearPrepareTaskByJobId(jobId); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index 314b24f052..ed094966e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -136,6 +136,7 @@ public class TimerJobManager implements Closeable, Writable { jobMap.remove(job.getJobId()); log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) .add("msg", "replay delete scheduler job").build()); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); } private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java index 878f8c5594..1036285c47 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TaskDisruptorTest.java @@ -18,6 +18,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.catalog.Env; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.executor.JobExecutor; import org.apache.doris.scheduler.job.ExecutorResult; import org.apache.doris.scheduler.job.Job; @@ -61,6 +62,7 @@ public class TaskDisruptorTest { void testPublishEventAndConsumer() { Job job = new Job("test", 6000L, null, null, new TestExecutor()); + job.setJobCategory(JobCategory.COMMON); new Expectations() {{ timerJobManager.getJob(anyLong); result = job;