diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 2416a6bca5..091ac158c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -166,7 +166,7 @@ public abstract class AbstractJob implements Job task.getTaskId().equals(taskId)).findFirst() - .orElseThrow(() -> new JobException("no task id: " + taskId)).cancel(); + .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); @@ -289,19 +289,19 @@ public abstract class AbstractJob implements Job implements Job> impl } if (CollectionUtils.isEmpty(historyTaskIdList)) { historyTaskIdList = new ConcurrentLinkedQueue<>(); - Env.getCurrentEnv().getEditLog().logUpdateJob(this); historyTaskIdList.add(id); + Env.getCurrentEnv().getEditLog().logUpdateJob(this); return; } historyTaskIdList.add(id); if (historyTaskIdList.size() >= Config.max_persistence_task_count) { historyTaskIdList.poll(); } + Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @Override @@ -320,22 +323,44 @@ public class InsertJob extends AbstractJob> impl } //TODO it's will be refactor, we will storage task info in job inner and query from it List taskIdList = new ArrayList<>(this.historyTaskIdList); + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { + Collections.reverse(taskIdList); + return queryLoadTasksByTaskIds(taskIdList); + } + List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); + if (CollectionUtils.isEmpty(loadJobs)) { + return new ArrayList<>(); + } + List tasks = new ArrayList<>(); + loadJobs.forEach(loadJob -> { + InsertTask task; + try { + task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); + task.setCreateTimeMs(loadJob.getCreateTimestamp()); + } catch (MetaNotFoundException e) { + log.warn("load job not found, job id is {}", loadJob.getId()); + return; + } + task.setJobId(getJobId()); + task.setTaskId(loadJob.getId()); + task.setJobInfo(loadJob); + tasks.add(task); + }); + return tasks; - Collections.reverse(taskIdList); - return queryLoadTasksByTaskIds(taskIdList); } public List queryLoadTasksByTaskIds(List taskIdList) { if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List jobs = new ArrayList<>(); + List tasks = new ArrayList<>(); taskIdList.forEach(id -> { if (null != idToTasks.get(id)) { - jobs.add(idToTasks.get(id)); + tasks.add(idToTasks.get(id)); } }); - return jobs; + return tasks; } @Override @@ -354,14 +379,11 @@ public class InsertJob extends AbstractJob> impl } @Override - public void onTaskFail(InsertTask task) { - try { - updateJobStatus(JobStatus.STOPPED); + public void onTaskFail(InsertTask task) throws JobException { + if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg()); - } catch (JobException e) { - throw new RuntimeException(e); } - getRunningTasks().remove(task); + super.onTaskFail(task); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index e85e7a1b02..b5d8ea7fc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -21,11 +21,11 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; @@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask { @Getter @Setter - private InsertJob jobInfo; + private LoadJob jobInfo; private TaskType taskType = TaskType.PENDING; private MergeType mergeType = MergeType.APPEND; @@ -127,7 +127,7 @@ public class InsertTask extends AbstractTask { } public InsertTask(String labelName, InsertIntoTableCommand insertInto, - ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { + ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) { this.labelName = labelName; this.command = insertInto; this.userIdentity = ctx.getCurrentUserIdentity(); @@ -216,23 +216,27 @@ public class InsertTask extends AbstractTask { // if task not start, load job is null,return pending task show info return getPendingTaskTVFInfo(); } - trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); trow.addToColumnValue(new TCell().setStringVal(labelName)); - trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name())); + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name())); // err msg - String errMsg = FeConstants.null_string; + String errMsg = ""; if (failMsg != null) { errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); } trow.addToColumnValue(new TCell().setStringVal(errMsg)); // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp()))); // tracking url trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); - trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + if (null != loadStatistic) { + trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + } else { + trow.addToColumnValue(new TCell().setStringVal("")); + } trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } @@ -244,11 +248,11 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); - trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string)); + trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); return trow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index b069fd5eca..7e8b01ce28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -137,16 +137,21 @@ public class JobManager, C> implements Writable { * @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception */ public void unregisterJob(String jobName, boolean ifExists) throws JobException { - T dropJob = null; - for (T job : jobMap.values()) { - if (job.getJobName().equals(jobName)) { - dropJob = job; + try { + T dropJob = null; + for (T job : jobMap.values()) { + if (job.getJobName().equals(jobName)) { + dropJob = job; + } } + if (dropJob == null && ifExists) { + return; + } + dropJob(dropJob, jobName); + } catch (Exception e) { + log.error("drop job error, jobName:" + jobName, e); + throw new JobException("unregister job error, jobName:" + jobName); } - if (dropJob == null && ifExists) { - return; - } - dropJob(dropJob, jobName); } private void dropJob(T dropJob, String jobName) throws JobException { @@ -284,6 +289,7 @@ public class JobManager, C> implements Writable { for (T job : jobMap.values()) { if (job.getJobName().equals(jobName)) { job.cancelTaskById(taskId); + job.logUpdateOperation(); return; } } @@ -378,6 +384,7 @@ public class JobManager, C> implements Writable { } } + //todo it's not belong to JobManager public void cancelLoadJob(CancelLoadStmt cs) throws JobException, AnalysisException, DdlException { String dbName = cs.getDbName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 597e39d96e..a104d3895e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -184,8 +184,8 @@ public class JobScheduler, C> implements Closeable { } for (Map.Entry entry : jobMap.entrySet()) { T job = entry.getValue(); - if (job.getJobStatus().equals(JobStatus.FINISHED)) { - clearFinishedJob(job); + if (job.getJobStatus().equals(JobStatus.FINISHED) || job.getJobStatus().equals(JobStatus.STOPPED)) { + clearEndJob(job); continue; } if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { @@ -195,7 +195,7 @@ public class JobScheduler, C> implements Closeable { } } - private void clearFinishedJob(T job) { + private void clearEndJob(T job) { if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) { return; } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index f4db5907fa..d9ebb83215 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -71,9 +71,18 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ Thread.sleep(2500) - def jobs = sql """select * from ${tableName}""" - println jobs - assert 3 >= jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records + sql """ + PAUSE JOB where jobname = '${jobName}' + """ + def tblDatas = sql """select * from ${tableName}""" + println tblDatas + assert 3 >= tblDatas.size() >= (2 as Boolean) //at least 2 records, some times 3 records + def pauseJobId = sql """select id from jobs("type"="insert") where Name='${jobName}'""" + def taskStatus = sql """select status from tasks("type"="insert") where jobid= '${pauseJobId.get(0).get(0)}'""" + println taskStatus + for (int i = 0; i < taskStatus.size(); i++) { + assert taskStatus.get(i).get(0) != "FAILED"||taskStatus.get(i).get(0) != "STOPPED"||taskStatus.get(i).get(0) != "STOPPED" + } sql """ CREATE JOB ${jobMixedName} ON SCHEDULE every 1 second DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ @@ -132,9 +141,8 @@ suite("test_base_insert_job") { sql """cancel task where jobName='${jobName}' and taskId= ${taskId}""" def cancelTask = sql """ select status from tasks("type"="insert") where jobid= ${onceJobId}""" println cancelTask - //check task status - assert cancelTask.size() == 1 - assert cancelTask.get(0).get(0) == "CANCELED" + //check task size is 0, cancel task where be deleted + assert cancelTask.size() == 0 // check table data def dataCount1 = sql """select count(1) from ${tableName}""" assert dataCount1.get(0).get(0) == 0 @@ -161,14 +169,14 @@ suite("test_base_insert_job") { assert job.size() == 1 def jobId = job.get(0).get(0); def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ - assert tasks.size() == 1 + assert tasks.size() == 0 sql """ RESUME JOB where jobname = '${jobName}' """ Thread.sleep(2500) def resumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ println resumeTasks - assert resumeTasks.size() == 2 + assert resumeTasks.size() == 1 // assert same job name try { sql """