[Fix](Job)Incorrect task query result of insert type (#30024)

- IdToTask has no persistence, so the queried task will be lost once it is restarted.

- The cancel task does not update metadata after being removed from the running task.

- tvf displays an error when some fields in the query task result are empty

- cycle scheduling job should not be STOP when task fail
This commit is contained in:
Calvin Kirs
2024-01-22 14:46:40 +08:00
committed by yiguolei
parent 24c0900b41
commit 5c43708d92
6 changed files with 95 additions and 49 deletions

View File

@ -166,7 +166,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id: " + taskId)).cancel();
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
@ -289,19 +289,19 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
@Override
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
updateJobStatusIfEnd(false);
runningTasks.remove(task);
}
@Override
public void onTaskSuccess(T task) throws JobException {
updateJobStatusIfEnd();
updateJobStatusIfEnd(true);
runningTasks.remove(task);
}
private void updateJobStatusIfEnd() throws JobException {
private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
@ -309,7 +309,12 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
switch (executeType) {
case ONE_TIME:
case INSTANT:
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
this.finishTimeMs = System.currentTimeMillis();
if (taskSuccess) {
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
} else {
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
}
break;
case RECURRING:
TimerDefinition timerDefinition = getJobConfig().getTimerDefinition();

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
@ -42,6 +43,7 @@ import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
@ -272,14 +274,15 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
}
if (CollectionUtils.isEmpty(historyTaskIdList)) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
historyTaskIdList.add(id);
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
return;
}
historyTaskIdList.add(id);
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@Override
@ -320,22 +323,44 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
}
//TODO it's will be refactor, we will storage task info in job inner and query from it
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
Collections.reverse(taskIdList);
return queryLoadTasksByTaskIds(taskIdList);
}
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
if (CollectionUtils.isEmpty(loadJobs)) {
return new ArrayList<>();
}
List<InsertTask> tasks = new ArrayList<>();
loadJobs.forEach(loadJob -> {
InsertTask task;
try {
task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser());
task.setCreateTimeMs(loadJob.getCreateTimestamp());
} catch (MetaNotFoundException e) {
log.warn("load job not found, job id is {}", loadJob.getId());
return;
}
task.setJobId(getJobId());
task.setTaskId(loadJob.getId());
task.setJobInfo(loadJob);
tasks.add(task);
});
return tasks;
Collections.reverse(taskIdList);
return queryLoadTasksByTaskIds(taskIdList);
}
public List<InsertTask> queryLoadTasksByTaskIds(List<Long> taskIdList) {
if (taskIdList.isEmpty()) {
return new ArrayList<>();
}
List<InsertTask> jobs = new ArrayList<>();
List<InsertTask> tasks = new ArrayList<>();
taskIdList.forEach(id -> {
if (null != idToTasks.get(id)) {
jobs.add(idToTasks.get(id));
tasks.add(idToTasks.get(id));
}
});
return jobs;
return tasks;
}
@Override
@ -354,14 +379,11 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
}
@Override
public void onTaskFail(InsertTask task) {
try {
updateJobStatus(JobStatus.STOPPED);
public void onTaskFail(InsertTask task) throws JobException {
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, task.getErrMsg());
} catch (JobException e) {
throw new RuntimeException(e);
}
getRunningTasks().remove(task);
super.onTaskFail(task);
}
@Override

View File

@ -21,11 +21,11 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
@ -89,7 +89,7 @@ public class InsertTask extends AbstractTask {
@Getter
@Setter
private InsertJob jobInfo;
private LoadJob jobInfo;
private TaskType taskType = TaskType.PENDING;
private MergeType mergeType = MergeType.APPEND;
@ -127,7 +127,7 @@ public class InsertTask extends AbstractTask {
}
public InsertTask(String labelName, InsertIntoTableCommand insertInto,
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
this.labelName = labelName;
this.command = insertInto;
this.userIdentity = ctx.getCurrentUserIdentity();
@ -216,23 +216,27 @@ public class InsertTask extends AbstractTask {
// if task not start, load job is null,return pending task show info
return getPendingTaskTVFInfo();
}
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getJobId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId())));
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(labelName));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getJobStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name()));
// err msg
String errMsg = FeConstants.null_string;
String errMsg = "";
if (failMsg != null) {
errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg();
}
trow.addToColumnValue(new TCell().setStringVal(errMsg));
// create time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
// load end time
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
// tracking url
trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
if (null != loadStatistic) {
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}
@ -244,11 +248,11 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
return trow;
}

View File

@ -137,16 +137,21 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
* @param ifExists is is true, if job not exist,we will ignore job not exist exception, else throw exception
*/
public void unregisterJob(String jobName, boolean ifExists) throws JobException {
T dropJob = null;
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
dropJob = job;
try {
T dropJob = null;
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
dropJob = job;
}
}
if (dropJob == null && ifExists) {
return;
}
dropJob(dropJob, jobName);
} catch (Exception e) {
log.error("drop job error, jobName:" + jobName, e);
throw new JobException("unregister job error, jobName:" + jobName);
}
if (dropJob == null && ifExists) {
return;
}
dropJob(dropJob, jobName);
}
private void dropJob(T dropJob, String jobName) throws JobException {
@ -284,6 +289,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
job.logUpdateOperation();
return;
}
}
@ -378,6 +384,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
}
}
//todo it's not belong to JobManager
public void cancelLoadJob(CancelLoadStmt cs)
throws JobException, AnalysisException, DdlException {
String dbName = cs.getDbName();

View File

@ -184,8 +184,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
}
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
if (job.getJobStatus().equals(JobStatus.FINISHED)) {
clearFinishedJob(job);
if (job.getJobStatus().equals(JobStatus.FINISHED) || job.getJobStatus().equals(JobStatus.STOPPED)) {
clearEndJob(job);
continue;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
@ -195,7 +195,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
}
}
private void clearFinishedJob(T job) {
private void clearEndJob(T job) {
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) {
return;
}