[Fix](Job)Fix the window time is not updated when no job is registered (#23628)

Fix resume job grammar definition is inconsistent
Show Job task Add execution results
JOB allows to define update operations
This commit is contained in:
Calvin Kirs
2023-08-30 09:48:21 +08:00
committed by GitHub
parent e05a0466f2
commit ca55bd88ad
13 changed files with 42 additions and 16 deletions

View File

@ -2680,7 +2680,7 @@ create_job_stmt ::=
:}
;
resume_job_stmt ::=
KW_RESUME KW_JOB job_label:jobLabel
KW_RESUME KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ResumeJobStmt(jobLabel);
:}

View File

@ -85,7 +85,8 @@ public class CreateJobStmt extends DdlStmt {
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class).build();
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();
private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);

View File

@ -43,6 +43,7 @@ public class ShowJobTaskStmt extends ShowStmt {
.add("StartTime")
.add("EndTime")
.add("Status")
.add("Result")
.add("ErrorMsg")
.build();

View File

@ -108,11 +108,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
JobTask jobTask = new JobTask(jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
// TODO: We should record the result of the event task.
//Object result = job.getExecutor().execute();
job.getExecutor().execute(job);
Object result = job.getExecutor().execute(job);
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.isCycleJob()) {
updateJobStatusIfPastEndTime(job);
@ -120,9 +116,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
// one time job should be finished after execute
updateOnceTimeJobStatus(job);
}
String resultStr = Objects.isNull(result) ? "" : result.toString();
jobTask.setExecuteResult(resultStr);
jobTask.setIsSuccessful(true);
} catch (Exception e) {
log.warn("Event job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
job.pause(e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);

View File

@ -35,6 +35,7 @@ import java.util.UUID;
/**
* we use this executor to execute sql job
*
* @param <QueryState> the state of sql job, we can record the state of sql job
*/
@Slf4j
@ -50,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor {
}
@Override
public QueryState execute(Job job) throws JobException {
public String execute(Job job) throws JobException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
@ -66,12 +67,22 @@ public class SqlJobExecutor<QueryState> implements JobExecutor {
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
log.debug("execute sql job success, sql: {}, state is: {}", sql, ctx.getState());
return (QueryState) ctx.getState();
return convertExecuteResult(ctx, taskIdString);
} catch (Exception e) {
log.warn("execute sql job failed, sql: {}, error: {}", sql, e);
throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage());
}
}
private String convertExecuteResult(ConnectContext ctx, String queryId) throws JobException {
if (null == ctx.getState()) {
throw new JobException("execute sql job failed, sql: " + sql + ", error: response state is null");
}
if (null != ctx.getState().getErrorCode()) {
throw new JobException("error code: " + ctx.getState().getErrorCode() + ", error msg: "
+ ctx.getState().getErrorMessage());
}
return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: "
+ ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage();
}
}

View File

@ -44,6 +44,8 @@ public class JobTask implements Writable {
private Long endTimeMs;
@SerializedName("successful")
private Boolean isSuccessful;
@SerializedName("executeResult")
private String executeResult;
@SerializedName("errorMsg")
private String errorMsg;
@ -64,6 +66,11 @@ public class JobTask implements Writable {
} else {
row.add(isSuccessful ? "SUCCESS" : "FAILED");
}
if (null == executeResult) {
row.add("null");
} else {
row.add(executeResult);
}
if (null == errorMsg) {
row.add("null");
} else {

View File

@ -40,7 +40,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class JobTaskManager implements Writable {
private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_num;
private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_saved_count;
private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16);

View File

@ -321,7 +321,13 @@ public class TimerJobManager implements Closeable, Writable {
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeJobIdsWithinLastTenMinutesWindow() {
// if the task executes for more than 10 minutes, it will be delay, so,
// set lastBatchSchedulerTimestamp to current time
if (lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
if (jobMap.isEmpty()) {
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
return;
}
jobMap.forEach((k, v) -> {