[Fix](Job)Fixed the problem of not deleting JOB during DROP JOB metadata playback (#29543)

This commit is contained in:
Calvin Kirs
2024-01-08 18:09:21 +08:00
committed by yiguolei
parent 78fc38f53e
commit 30e46ee5ad
8 changed files with 100 additions and 52 deletions

View File

@ -2623,9 +2623,9 @@ pause_job_stmt ::=
;
stop_job_stmt ::=
KW_DROP KW_JOB opt_wild_where
KW_DROP KW_JOB opt_if_exists:ifExists opt_wild_where
{:
RESULT = new AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.STOPPED);
RESULT = new AlterJobStatusStmt(parser.where,true,ifExists);
:}
;
resume_job_stmt ::=

View File

@ -36,11 +36,23 @@ public class AlterJobStatusStmt extends DdlStmt {
@Getter
private JobStatus jobStatus;
@Getter
private boolean isDrop = false;
@Getter
private boolean ifExists;
public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) {
this.expr = whereClause;
this.jobStatus = jobStatus;
}
public AlterJobStatusStmt(Expr whereClause, boolean isDrop, boolean ifExists) {
this.expr = whereClause;
this.isDrop = isDrop;
this.ifExists = ifExists;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);

View File

@ -258,7 +258,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
if (newJobStatus.equals(JobStatus.FINISHED)) {
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus)) {
if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) {
cancelAllTasks();
}
jobStatus = newJobStatus;
@ -279,8 +279,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
Env.getCurrentEnv().getEditLog().logCreateJob(this);
}
public void logFinalOperation() {
Env.getCurrentEnv().getEditLog().logEndJob(this);
public void logDeleteOperation() {
Env.getCurrentEnv().getEditLog().logDeleteJob(this);
}
public void logUpdateOperation() {

View File

@ -36,7 +36,6 @@ import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.loadv2.JobState;
import com.google.common.collect.Lists;
@ -120,33 +119,55 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
}
}
/**
* unregister job by job id,this method will delete job from job map
* we need to check job status, if job status is running, we need to stop it
* and cancel all running task
*/
public void unregisterJob(Long jobId) throws JobException {
checkJobExist(jobId);
T dropJob = jobMap.get(jobId);
dropJob(dropJob, dropJob.getJobName());
}
/**
* unregister job by job name,this method will delete job from job map
*
* @param jobName job name
* @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception
*/
public void unregisterJob(String jobName, boolean ifExists) throws JobException {
T dropJob = null;
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
dropJob = job;
}
}
if (dropJob == null && ifExists) {
return;
}
dropJob(dropJob, jobName);
}
private void dropJob(T dropJob, String jobName) throws JobException {
if (dropJob == null) {
throw new JobException("job not exist, jobName:" + jobName);
}
//is job status is running, we need to stop it and cancel all running task
// since job only running in master, we don't need to write update metadata log
if (dropJob.getJobStatus().equals(JobStatus.RUNNING)) {
dropJob.updateJobStatus(JobStatus.STOPPED);
}
writeLock();
try {
checkJobExist(jobId);
jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
jobMap.get(jobId).cancelAllTasks();
jobMap.get(jobId).logFinalOperation();
jobMap.get(jobId).onUnRegister();
jobMap.remove(jobId);
// write delete log
dropJob.logDeleteOperation();
jobMap.remove(dropJob.getJobId());
} finally {
writeUnlock();
}
}
public void unregisterJob(String jobName) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
unregisterJob(a.getJobId());
} catch (JobException e) {
throw new JobException("unregister job error, jobName:" + jobName);
}
}
}
}
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
@ -157,10 +178,6 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName)) {
try {
if (jobStatus.equals(JobStatus.STOPPED)) {
unregisterJob(a.getJobId());
return;
}
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
throw new JobException("unregister job error, jobName:" + jobName);
@ -214,11 +231,12 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
return jobTypes.contains(job.getJobType());
}
public List<? extends AbstractTask> queryTasks(Long jobId) throws JobException {
checkJobExist(jobId);
return jobMap.get(jobId).queryAllTasks();
}
/**
* Actively trigger job execution tasks, tasks type is manual
*
* @param jobId job id
* @param context Context parameter information required by some tasks executed this time
*/
public void triggerJob(long jobId, C context) throws JobException {
log.info("trigger job, job id is {}", jobId);
checkJobExist(jobId);
@ -242,14 +260,26 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
.add("msg", "replay update scheduler job").build());
}
public void replayEndJob(T replayJob) throws JobException {
/**
* Replay delete load job. we need to remove job from job map
*/
public void replayDeleteJob(T replayJob) throws JobException {
T job = jobMap.get(replayJob.getJobId());
if (null == job) {
return;
}
jobMap.remove(replayJob.getJobId());
job.onReplayEnd(replayJob);
}
/**
* Cancel task by task id, if task is running, cancel it
* if job not exist, throw JobException exception job not exist
* if task not exist, throw JobException exception task not exist
*
* @param jobName job name
* @param taskId task id
*/
public void cancelTaskById(String jobName, Long taskId) throws JobException {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
@ -377,7 +407,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
unregisterJob(loadJob.getJobId());
alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED);
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}

View File

@ -673,7 +673,7 @@ public class EditLog {
}
case OperationType.OP_DELETE_SCHEDULER_JOB: {
AbstractJob job = (AbstractJob) journal.getData();
Env.getCurrentEnv().getJobManager().replayEndJob(job);
Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
break;
}
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
@ -1625,7 +1625,7 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
public void logEndJob(AbstractJob job) {
public void logDeleteJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}

View File

@ -197,6 +197,12 @@ public class DdlExecutor {
} else if (ddlStmt instanceof AlterJobStatusStmt) {
AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt;
try {
// drop job
if (stmt.isDrop()) {
env.getJobManager().unregisterJob(stmt.getJobName(), stmt.isIfExists());
return;
}
// alter job status
env.getJobManager().alterJobStatus(stmt.getJobName(), stmt.getJobStatus());
} catch (Exception e) {
throw new DdlException(e.getMessage());

View File

@ -32,11 +32,11 @@ public class JobExecutionConfigurationTest {
configuration.setExecuteType(JobExecuteType.ONE_TIME);
TimerDefinition timerDefinition = new TimerDefinition();
timerDefinition.setStartTimeMs(System.currentTimeMillis() + 1000); // Start time set to 1 second in the future
timerDefinition.setStartTimeMs(1000L); // Start time set to 1 second in the future
configuration.setTimerDefinition(timerDefinition);
List<Long> delayTimes = configuration.getTriggerDelayTimes(
System.currentTimeMillis(), System.currentTimeMillis(), System.currentTimeMillis() + 5000);
0L, 0L, 5000L);
Assertions.assertEquals(1, delayTimes.size());
Assertions.assertEquals(1, delayTimes.get(0).longValue());

View File

@ -26,32 +26,32 @@ suite("test_base_insert_job") {
def jobMixedName = "Insert_recovery_Test_base_insert_job"
sql """drop table if exists `${tableName}` force"""
sql """
DROP JOB where jobname = '${jobName}'
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
sql """
DROP JOB where jobname = 'JOB'
DROP JOB IF EXISTS where jobname = 'JOB'
"""
sql """
DROP JOB where jobname = 'DO'
DROP JOB IF EXISTS where jobname = 'DO'
"""
sql """
DROP JOB where jobname = 'AT'
DROP JOB IF EXISTS where jobname = 'AT'
"""
sql """
DROP JOB where jobname = 'SCHEDULE'
DROP JOB IF EXISTS where jobname = 'SCHEDULE'
"""
sql """
DROP JOB where jobname = 'STARTS'
DROP JOB IF EXISTS where jobname = 'STARTS'
"""
sql """
DROP JOB where jobname = 'ENDS'
DROP JOB IF EXISTS where jobname = 'ENDS'
"""
sql """
DROP JOB where jobname = '${jobMixedName}'
DROP JOB IF EXISTS where jobname = '${jobMixedName}'
"""
sql """
DROP JOB where jobname = '${jobName}'
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
sql """
@ -82,10 +82,10 @@ suite("test_base_insert_job") {
assert mixedNameJobs.size() == 1 && mixedNameJobs.get(0).get(0) == jobMixedName
assert mixedNameJobs.get(0).get(1) == ''
sql """
DROP JOB where jobname = '${jobName}'
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
sql """
DROP JOB where jobname = '${jobMixedName}'
DROP JOB IF EXISTS where jobname = '${jobMixedName}'
"""
sql """drop table if exists `${tableName}` force """
@ -145,7 +145,7 @@ suite("test_base_insert_job") {
//assert comment
assert oncejob.get(0).get(1) == "test for test&68686781jbjbhj//ncsa"
sql """
DROP JOB where jobname = '${jobName}'
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
sql """