[Fix](Job)cancel task is not cleared from running task (#29114)

This commit is contained in:
Calvin Kirs
2023-12-27 13:59:12 +08:00
committed by GitHub
parent 05f185ff44
commit 0ac9b3d113
5 changed files with 98 additions and 47 deletions

View File

@ -89,7 +89,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
@SerializedName(value = "sql")
String executeSql;
public AbstractJob() {}
public AbstractJob() {
}
public AbstractJob(Long id) {
jobId = id;
@ -99,10 +100,10 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
* executeSql and runningTasks is not required for load.
*/
public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
String currentDbName,
String comment,
UserIdentity createUser,
JobExecutionConfiguration jobConfig) {
this(jobId, jobName, jobStatus, currentDbName, comment,
createUser, jobConfig, System.currentTimeMillis(), null, null);
}
@ -137,6 +138,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
for (T task : runningTasks) {
task.cancel();
}
runningTasks = new ArrayList<>();
}
private static final ImmutableList<String> TITLE_NAMES =
@ -163,6 +165,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id: " + taskId)).cancel();
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
@ -364,10 +367,12 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
}
@Override
public void onRegister() throws JobException {}
public void onRegister() throws JobException {
}
@Override
public void onUnRegister() throws JobException {}
public void onUnRegister() throws JobException {
}
@Override
public void onReplayCreate() throws JobException {

View File

@ -46,6 +46,7 @@ import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.Privilege;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -67,7 +68,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
@ -84,7 +84,7 @@ import java.util.stream.Collectors;
@EqualsAndHashCode(callSuper = true)
@Data
@Slf4j
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> implements GsonPostProcessable {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@ -156,6 +156,31 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;
@Override
public void gsonPostProcess() throws IOException {
if (null == plans) {
plans = new ArrayList<>();
}
if (null == idToTasks) {
idToTasks = new ConcurrentHashMap<>();
}
if (null == loadStatistic) {
loadStatistic = new LoadStatistic();
}
if (null == finishedTaskIds) {
finishedTaskIds = new HashSet<>();
}
if (null == errorTabletInfos) {
errorTabletInfos = new ArrayList<>();
}
if (null == commitInfos) {
commitInfos = new ArrayList<>();
}
if (null == historyTaskIdList) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
}
}
/**
* load job type
*/
@ -197,13 +222,13 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
}
public InsertJob(ConnectContext ctx,
StmtExecutor executor,
String labelName,
List<InsertIntoTableCommand> plans,
Set<String> sinkTableNames,
Map<String, String> properties,
String comment,
JobExecutionConfiguration jobConfig) {
StmtExecutor executor,
String labelName,
List<InsertIntoTableCommand> plans,
Set<String> sinkTableNames,
Map<String, String> properties,
String comment,
JobExecutionConfiguration jobConfig) {
super(getNextJobId(), labelName, JobStatus.RUNNING, null,
comment, ctx.getCurrentUserIdentity(), jobConfig);
this.ctx = ctx;
@ -460,14 +485,6 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> {
return Config.broker_load_default_timeout_second;
}
public static InsertJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
job.setRunningTasks(new ArrayList<>());
return job;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));

View File

@ -220,6 +220,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
}
public void triggerJob(long jobId, C context) throws JobException {
log.info("trigger job, job id is {}", jobId);
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context);
}

View File

@ -154,7 +154,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
public void schedulerInstantJob(T job, TaskType taskType, C context) {
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, context);
if (CollectionUtils.isEmpty(tasks)) {
log.info("job create task is empty, skip scheduler, job id is {},job name is {}", job.getJobId(),
log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(),
job.getJobName());
if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);