[Feature](Job)Job tasks support the choice of persistence or storage in memory (#24919)

This commit is contained in:
Calvin Kirs
2023-10-07 12:20:36 +08:00
committed by GitHub
parent 7b2ff38401
commit 0e615a531e
6 changed files with 37 additions and 21 deletions

View File

@ -58,7 +58,6 @@ public class ShowJobTaskStmt extends ShowStmt {
@Getter
private JobCategory jobCategory; // optional
private String jobCategoryName; // optional
@Getter
private String dbFullName; // optional
@Getter
@ -66,7 +65,12 @@ public class ShowJobTaskStmt extends ShowStmt {
public ShowJobTaskStmt(String category, LabelName labelName) {
this.labelName = labelName;
this.jobCategoryName = category;
String jobCategoryName = category;
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
@Override
@ -74,11 +78,6 @@ public class ShowJobTaskStmt extends ShowStmt {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
checkLabelName(analyzer);
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
private void checkLabelName(Analyzer analyzer) throws AnalysisException {
@ -113,6 +112,9 @@ public class ShowJobTaskStmt extends ShowStmt {
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
if (jobCategory.isPersistent()) {
return RedirectStatus.FORWARD_NO_SYNC;
}
return RedirectStatus.NO_FORWARD;
}
}

View File

@ -23,9 +23,9 @@ import lombok.Getter;
* The job category is used to distinguish different types of jobs.
*/
public enum JobCategory {
COMMON(1, "common"),
SQL(2, "sql"),
MTMV(3, "mtmv"),
COMMON(1, "common", true),
SQL(2, "sql", true),
MTMV(3, "mtmv", false),
;
@Getter
@ -34,9 +34,17 @@ public enum JobCategory {
@Getter
private String name;
JobCategory(int code, String name) {
/**
* if the job is persistent, it will be saved to the metadata store.
* if the job is not persistent, it will not be saved to the memory.
*/
@Getter
private boolean persistent;
JobCategory(int code, String name, boolean persistent) {
this.code = code;
this.name = name;
this.persistent = persistent;
}
public static JobCategory getJobCategoryByName(String name) {

View File

@ -139,7 +139,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
if (null == jobTaskManager) {
jobTaskManager = Env.getCurrentEnv().getJobTaskManager();
}
jobTaskManager.addJobTask(jobTask);
boolean isPersistent = job.getJobCategory().isPersistent();
jobTaskManager.addJobTask(jobTask, isPersistent);
}
public void onTransientTaskHandle(TaskEvent taskEvent) {

View File

@ -70,15 +70,19 @@ public class JobTaskManager implements Writable {
prepareTaskCreateMsMap.remove(jobId);
}
public void addJobTask(JobTask jobTask) {
public void addJobTask(JobTask jobTask, boolean persist) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>());
jobTasks.add(jobTask);
if (jobTasks.size() > TASK_MAX_NUM) {
JobTask oldTask = jobTasks.poll();
Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask);
if (persist) {
Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask);
}
}
if (persist) {
Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask);
}
Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask);
}
public List<JobTask> getJobTasks(Long jobId) {
@ -110,12 +114,10 @@ public class JobTaskManager implements Writable {
public void deleteJobTasks(Long jobId) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(jobId);
if (jobTasks != null) {
JobTask jobTask = jobTasks.poll();
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, jobTask.getTaskId())
.add("msg", "replay delete scheduler task").build());
if (null != jobTasks) {
jobTaskMap.remove(jobId);
}
jobTaskMap.remove(jobId);
clearPrepareTaskByJobId(jobId);
}
@Override

View File

@ -136,6 +136,7 @@ public class TimerJobManager implements Closeable, Writable {
jobMap.remove(job.getJobId());
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay delete scheduler job").build());
Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId());
}
private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException {