diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 05ba4ee9e1..70b2f4220d 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2623,9 +2623,9 @@ pause_job_stmt ::= ; stop_job_stmt ::= - KW_DROP KW_JOB opt_wild_where + KW_DROP KW_JOB opt_if_exists:ifExists opt_wild_where {: - RESULT = new AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.STOPPED); + RESULT = new AlterJobStatusStmt(parser.where,true,ifExists); :} ; resume_job_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java index 16077cc260..31a35aca2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java @@ -36,11 +36,23 @@ public class AlterJobStatusStmt extends DdlStmt { @Getter private JobStatus jobStatus; + @Getter + private boolean isDrop = false; + + @Getter + private boolean ifExists; + public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) { this.expr = whereClause; this.jobStatus = jobStatus; } + public AlterJobStatusStmt(Expr whereClause, boolean isDrop, boolean ifExists) { + this.expr = whereClause; + this.isDrop = isDrop; + this.ifExists = ifExists; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 9d7b817f73..2416a6bca5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -258,7 +258,7 @@ public abstract class AbstractJob implements Job implements Job, C> implements Writable { } } + /** + * unregister job by job id,this method will delete job from job map + * we need to check job status, if job status is running, we need to stop it + * and cancel all running task + */ public void unregisterJob(Long jobId) throws JobException { + checkJobExist(jobId); + T dropJob = jobMap.get(jobId); + dropJob(dropJob, dropJob.getJobName()); + } + + /** + * unregister job by job name,this method will delete job from job map + * + * @param jobName job name + * @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception + */ + public void unregisterJob(String jobName, boolean ifExists) throws JobException { + T dropJob = null; + for (T job : jobMap.values()) { + if (job.getJobName().equals(jobName)) { + dropJob = job; + } + } + if (dropJob == null && ifExists) { + return; + } + dropJob(dropJob, jobName); + } + + private void dropJob(T dropJob, String jobName) throws JobException { + if (dropJob == null) { + throw new JobException("job not exist, jobName:" + jobName); + } + //is job status is running, we need to stop it and cancel all running task + // since job only running in master, we don't need to write update metadata log + if (dropJob.getJobStatus().equals(JobStatus.RUNNING)) { + dropJob.updateJobStatus(JobStatus.STOPPED); + } writeLock(); try { - checkJobExist(jobId); - jobMap.get(jobId).setJobStatus(JobStatus.STOPPED); - jobMap.get(jobId).cancelAllTasks(); - jobMap.get(jobId).logFinalOperation(); - jobMap.get(jobId).onUnRegister(); - jobMap.remove(jobId); + // write delete log + dropJob.logDeleteOperation(); + jobMap.remove(dropJob.getJobId()); } finally { writeUnlock(); } } - public void unregisterJob(String jobName) throws JobException { - for (T a : jobMap.values()) { - if (a.getJobName().equals(jobName)) { - try { - unregisterJob(a.getJobId()); - } catch (JobException e) { - throw new JobException("unregister job error, jobName:" + jobName); - } - } - } - - } - public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); @@ -157,10 +178,6 @@ public class JobManager, C> implements Writable { for (T a : jobMap.values()) { if (a.getJobName().equals(jobName)) { try { - if (jobStatus.equals(JobStatus.STOPPED)) { - unregisterJob(a.getJobId()); - return; - } alterJobStatus(a.getJobId(), jobStatus); } catch (JobException e) { throw new JobException("unregister job error, jobName:" + jobName); @@ -214,11 +231,12 @@ public class JobManager, C> implements Writable { return jobTypes.contains(job.getJobType()); } - public List queryTasks(Long jobId) throws JobException { - checkJobExist(jobId); - return jobMap.get(jobId).queryAllTasks(); - } - + /** + * Actively trigger job execution tasks, tasks type is manual + * + * @param jobId job id + * @param context Context parameter information required by some tasks executed this time + */ public void triggerJob(long jobId, C context) throws JobException { log.info("trigger job, job id is {}", jobId); checkJobExist(jobId); @@ -242,14 +260,26 @@ public class JobManager, C> implements Writable { .add("msg", "replay update scheduler job").build()); } - public void replayEndJob(T replayJob) throws JobException { + /** + * Replay delete load job. we need to remove job from job map + */ + public void replayDeleteJob(T replayJob) throws JobException { T job = jobMap.get(replayJob.getJobId()); if (null == job) { return; } + jobMap.remove(replayJob.getJobId()); job.onReplayEnd(replayJob); } + /** + * Cancel task by task id, if task is running, cancel it + * if job not exist, throw JobException exception job not exist + * if task not exist, throw JobException exception task not exist + * + * @param jobName job name + * @param taskId task id + */ public void cancelTaskById(String jobName, Long taskId) throws JobException { for (T job : jobMap.values()) { if (job.getJobName().equals(jobName)) { @@ -377,7 +407,7 @@ public class JobManager, C> implements Writable { } for (InsertJob loadJob : unfinishedLoadJob) { try { - unregisterJob(loadJob.getJobId()); + alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED); } catch (JobException e) { log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index cbbae9d178..de2d5e92ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -673,7 +673,7 @@ public class EditLog { } case OperationType.OP_DELETE_SCHEDULER_JOB: { AbstractJob job = (AbstractJob) journal.getData(); - Env.getCurrentEnv().getJobManager().replayEndJob(job); + Env.getCurrentEnv().getJobManager().replayDeleteJob(job); break; } /*case OperationType.OP_CREATE_SCHEDULER_TASK: { @@ -1625,7 +1625,7 @@ public class EditLog { logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job); } - public void logEndJob(AbstractJob job) { + public void logDeleteJob(AbstractJob job) { logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 51dc1eb731..ef6a47de69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -197,6 +197,12 @@ public class DdlExecutor { } else if (ddlStmt instanceof AlterJobStatusStmt) { AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt; try { + // drop job + if (stmt.isDrop()) { + env.getJobManager().unregisterJob(stmt.getJobName(), stmt.isIfExists()); + return; + } + // alter job status env.getJobManager().alterJobStatus(stmt.getJobName(), stmt.getJobStatus()); } catch (Exception e) { throw new DdlException(e.getMessage()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index 91678ee5c1..053d41bd0b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -32,11 +32,11 @@ public class JobExecutionConfigurationTest { configuration.setExecuteType(JobExecuteType.ONE_TIME); TimerDefinition timerDefinition = new TimerDefinition(); - timerDefinition.setStartTimeMs(System.currentTimeMillis() + 1000); // Start time set to 1 second in the future + timerDefinition.setStartTimeMs(1000L); // Start time set to 1 second in the future configuration.setTimerDefinition(timerDefinition); List delayTimes = configuration.getTriggerDelayTimes( - System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis() + 5000); + 0L, 0L, 5000L); Assertions.assertEquals(1, delayTimes.size()); Assertions.assertEquals(1, delayTimes.get(0).longValue()); diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 3ef2f86b8e..f4db5907fa 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -26,32 +26,32 @@ suite("test_base_insert_job") { def jobMixedName = "Insert_recovery_Test_base_insert_job" sql """drop table if exists `${tableName}` force""" sql """ - DROP JOB where jobname = '${jobName}' + DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """ - DROP JOB where jobname = 'JOB' + DROP JOB IF EXISTS where jobname = 'JOB' """ sql """ - DROP JOB where jobname = 'DO' + DROP JOB IF EXISTS where jobname = 'DO' """ sql """ - DROP JOB where jobname = 'AT' + DROP JOB IF EXISTS where jobname = 'AT' """ sql """ - DROP JOB where jobname = 'SCHEDULE' + DROP JOB IF EXISTS where jobname = 'SCHEDULE' """ sql """ - DROP JOB where jobname = 'STARTS' + DROP JOB IF EXISTS where jobname = 'STARTS' """ sql """ - DROP JOB where jobname = 'ENDS' + DROP JOB IF EXISTS where jobname = 'ENDS' """ sql """ - DROP JOB where jobname = '${jobMixedName}' + DROP JOB IF EXISTS where jobname = '${jobMixedName}' """ sql """ - DROP JOB where jobname = '${jobName}' + DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """ @@ -82,10 +82,10 @@ suite("test_base_insert_job") { assert mixedNameJobs.size() == 1 && mixedNameJobs.get(0).get(0) == jobMixedName assert mixedNameJobs.get(0).get(1) == '' sql """ - DROP JOB where jobname = '${jobName}' + DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """ - DROP JOB where jobname = '${jobMixedName}' + DROP JOB IF EXISTS where jobname = '${jobMixedName}' """ sql """drop table if exists `${tableName}` force """ @@ -145,7 +145,7 @@ suite("test_base_insert_job") { //assert comment assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa" sql """ - DROP JOB where jobname = '${jobName}' + DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """