[Feature](Job)Support manual and refactor some execution logic (#26082)

Supports manually triggered JOBs and Tasks
Optimize JOB&TASK display logic
Refactor the executor to support context passing
This commit is contained in:
Calvin Kirs
2023-10-31 20:35:55 +08:00
committed by GitHub
parent b87b09bb04
commit 08c78a1135
22 changed files with 352 additions and 150 deletions

View File

@ -28,6 +28,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
@ -41,22 +42,21 @@ import java.util.HashSet;
/**
* syntax:
* CREATE
* [DEFINER = user]
* JOB
* event_name
* ON SCHEDULE schedule
* [COMMENT 'string']
* DO event_body;
* [DEFINER = user]
* JOB
* event_name
* ON SCHEDULE schedule
* [COMMENT 'string']
* DO event_body;
* schedule: {
* [STREAMING] AT timestamp
* | EVERY interval
* [STARTS timestamp ]
* [ENDS timestamp ]
* [STREAMING] AT timestamp
* | EVERY interval
* [STARTS timestamp ]
* [ENDS timestamp ]
* }
* interval:
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*/
@Slf4j
public class CreateJobStmt extends DdlStmt {
@ -90,7 +90,7 @@ public class CreateJobStmt extends DdlStmt {
private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);
public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob,
public CreateJobStmt(LabelName labelName, String jobTypeName, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
@ -102,7 +102,8 @@ public class CreateJobStmt extends DdlStmt {
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
job.setStreamingJob(isStreamingJob);
JobType jobType = JobType.valueOf(jobTypeName.toUpperCase());
job.setJobType(jobType);
}
private String parseExecuteSql(String sql) throws AnalysisException {
@ -136,7 +137,7 @@ public class CreateJobStmt extends DdlStmt {
job.setTimezone(timezone);
job.setComment(comment);
//todo support user define
job.setUser("root");
job.setUser(ConnectContext.get().getQualifiedUser());
job.setJobStatus(JobStatus.RUNNING);
job.setJobCategory(JobCategory.SQL);
analyzerSqlStmt();
@ -172,7 +173,6 @@ public class CreateJobStmt extends DdlStmt {
private void analyzerCycleJob() throws UserException {
job.setCycleJob(true);
if (null == interval) {
throw new AnalysisException("interval is null");
}
@ -214,8 +214,6 @@ public class CreateJobStmt extends DdlStmt {
private void analyzerOnceTimeJob() throws UserException {
job.setCycleJob(false);
job.setIntervalMs(0L);
long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp);

View File

@ -47,19 +47,12 @@ public class ShowJobStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("Id")
.add("Db")
.add("Name")
.add("Definer")
.add("TimeZone")
.add("ExecuteType")
.add("ExecuteAt")
.add("ExecuteInterval")
.add("ExecuteIntervalUnit")
.add("Starts")
.add("Ends")
.add("RecurringStrategy")
.add("Status")
.add("LastExecuteFinishTime")
.add("ErrorMsg")
.add("lastExecuteTaskStatus")
.add("CreateTime")
.add("Comment")
.build();

View File

@ -41,8 +41,9 @@ public class ShowJobTaskStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("JobId")
.add("TaskId")
.add("JobId")
.add("JobName")
.add("CreateTime")
.add("StartTime")
.add("EndTime")
@ -50,6 +51,7 @@ public class ShowJobTaskStmt extends ShowStmt {
.add("ExecuteSql")
.add("Result")
.add("ErrorMsg")
.add("TaskType")
.build();
@Getter

View File

@ -355,6 +355,8 @@ public class Env {
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private JobTaskManager jobTaskManager;
private TaskDisruptor taskDisruptor;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon feDiskUpdater; // Update fe disk info
@ -629,7 +631,7 @@ public class Env {
this.jobTaskManager = new JobTaskManager();
this.timerJobManager = new TimerJobManager();
this.transientTaskManager = new TransientTaskManager();
TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, this.transientTaskManager);
this.taskDisruptor = new TaskDisruptor(this.timerJobManager, this.transientTaskManager);
this.timerJobManager.setDisruptor(taskDisruptor);
this.transientTaskManager.setDisruptor(taskDisruptor);
this.persistentJobRegister = new TimerJobRegister(timerJobManager);
@ -1532,6 +1534,7 @@ public class Env {
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
taskDisruptor.start();
timerJobManager.start();
// Alter
getAlterInstance().start();

View File

@ -1428,14 +1428,15 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
long jobId = jobs.get(0).getJobId();
Job job = jobs.get(0);
long jobId = job.getJobId();
List<JobTask> jobTasks = Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId);
if (CollectionUtils.isEmpty(jobTasks)) {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
for (JobTask job : jobTasks) {
rows.add(job.getShowInfo());
for (JobTask jobTask : jobTasks) {
rows.add(jobTask.getShowInfo(job.getJobName()));
}
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
}

View File

@ -29,5 +29,10 @@ public enum JobType {
/**
* JOB_TYPE_STREAMING is used to identify the streaming job.
*/
STREAMING
STREAMING,
/**
* The job will be executed manually and need to be triggered by the user.
*/
MANUAL
}

View File

@ -18,7 +18,19 @@
package org.apache.doris.scheduler.constants;
public enum TaskType {
TimerJobTask,
/**
* Usually don't require persistence and are used in various asynchronous tasks, such as export tasks.
* the life cycle of this kind of task is not managed by JOB-scheduler.
*/
TRANSIENT_TASK,
TransientTask
/**
* Tasks generated by scheduled jobs.
*/
SCHEDULER_JOB_TASK,
/**
* Tasks generated by manual jobs.
*/
MANUAL_JOB_TASK
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@ -29,6 +28,7 @@ import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
@ -47,10 +47,13 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class TaskDisruptor implements Closeable {
private final Disruptor<TaskEvent> disruptor;
private Disruptor<TaskEvent> disruptor;
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size;
private static int consumerThreadCount = Config.async_task_consumer_thread_num;
private static final int consumerThreadCount = Config.async_task_consumer_thread_num;
/**
* The default timeout for {@link #close()} in seconds.
@ -75,7 +78,12 @@ public class TaskDisruptor implements Closeable {
};
public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) {
ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer");
this.timerJobManager = timerJobManager;
this.transientTaskManager = transientTaskManager;
}
public void start() {
ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
@ -88,16 +96,29 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a job to the disruptor.
* Default task type is {@link TaskType#SCHEDULER_JOB_TASK}
*
* @param jobId job id
*/
public void tryPublish(Long jobId, Long taskId) {
this.tryPublish(jobId, taskId, TaskType.SCHEDULER_JOB_TASK);
}
/**
* Publishes a job task to the disruptor.
*
* @param jobId job id, describe which job this task belongs to
* @param taskId task id, it's linked to job id, we can get job detail by task id
* @param taskType {@link TaskType}
*/
public void tryPublish(Long jobId, Long taskId, TaskType taskType) {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId);
return;
}
try {
disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask);
disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType);
} catch (Exception e) {
log.error("tryPublish failed, jobId: {}", jobId, e);
}
@ -105,6 +126,7 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a task to the disruptor.
* Default task type is {@link TaskType#TRANSIENT_TASK}
*
* @param taskId task id
*/
@ -114,7 +136,7 @@ public class TaskDisruptor implements Closeable {
return;
}
try {
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask);
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK);
} catch (Exception e) {
log.error("tryPublish failed, taskId: {}", taskId, e);
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
@ -70,13 +71,15 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
@Override
public void onEvent(TaskEvent event) {
switch (event.getTaskType()) {
case TimerJobTask:
case SCHEDULER_JOB_TASK:
case MANUAL_JOB_TASK:
onTimerJobTaskHandle(event);
break;
case TransientTask:
case TRANSIENT_TASK:
onTransientTaskHandle(event);
break;
default:
log.warn("unknown task type: {}", event.getTaskType());
break;
}
}
@ -90,7 +93,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
public void onTimerJobTaskHandle(TaskEvent taskEvent) {
long jobId = taskEvent.getId();
long taskId = taskEvent.getTaskId();
long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId);
JobTask jobTask = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId);
if (jobTask == null) {
log.warn("jobTask is null, maybe it's cancel, jobId: {}, taskId: {}", jobId, taskId);
return;
}
Job job = timerJobManager.getJob(jobId);
if (job == null) {
log.info("job is null, jobId: {}", jobId);
@ -102,12 +109,12 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
log.debug("job is running, eventJobId: {}", jobId);
JobTask jobTask = new JobTask(jobId, taskId, createTimeMs);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
ExecutorResult result = job.getExecutor().execute(job);
ExecutorResult result = job.getExecutor().execute(job, jobTask.getContextData());
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.isCycleJob()) {
if (job.getJobType().equals(JobType.RECURRING)) {
updateJobStatusIfPastEndTime(job);
} else {
// one time job should be finished after execute
@ -117,7 +124,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
log.warn("Job execute failed, jobId: {}, result is null", jobId);
jobTask.setErrorMsg("Job execute failed, result is null");
jobTask.setIsSuccessful(false);
timerJobManager.pauseJob(jobId);
timerJobManager.setJobLatestStatus(jobId, false);
return;
}
String resultStr = GsonUtils.GSON.toJson(result.getResult());
@ -126,14 +133,12 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
if (!result.isSuccess()) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, result.getExecutorSql());
jobTask.setErrorMsg(result.getErrorMsg());
timerJobManager.pauseJob(jobId);
}
jobTask.setExecuteSql(result.getExecutorSql());
} catch (Exception e) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
timerJobManager.pauseJob(jobId);
}
jobTask.setEndTimeMs(System.currentTimeMillis());
if (null == jobTaskManager) {
@ -141,6 +146,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
boolean isPersistent = job.getJobCategory().isPersistent();
jobTaskManager.addJobTask(jobTask, isPersistent);
timerJobManager.setJobLatestStatus(jobId, jobTask.getIsSuccessful());
}
public void onTransientTaskHandle(TaskEvent taskEvent) {
@ -165,7 +171,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
private void updateOnceTimeJobStatus(Job job) {
if (job.isStreamingJob()) {
if (job.getJobType().equals(JobType.STREAMING)) {
timerJobManager.putOneJobToQueen(job.getJobId());
return;
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.thrift.TUniqueId;
import lombok.Getter;
import java.util.UUID;
@Getter
public abstract class AbstractJobExecutor<T, C> implements JobExecutor<T, C> {
protected ConnectContext createContext(Job job) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
ctx.setDatabase(job.getDbName());
ctx.setQualifiedUser(job.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
ctx.getState().reset();
ctx.setThreadLocalInfo();
return ctx;
}
protected String generateTaskId() {
return UUID.randomUUID().toString();
}
protected TUniqueId generateQueryId(String taskIdString) {
UUID taskId = UUID.fromString(taskIdString);
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
}
}

View File

@ -28,18 +28,19 @@ import org.apache.doris.scheduler.job.Job;
* We use Gson to serialize and deserialize JobExecutor. so the implementation of JobExecutor needs to be serializable.
* You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When you implement JobExecutor,pls make sure
* you can serialize and deserialize it.
*
* @param <T> The result type of the event job execution.
*/
@FunctionalInterface
public interface JobExecutor<T> {
public interface JobExecutor<T, C> {
/**
* Executes the event job and returns the result.
* Exceptions will be caught internally, so there is no need to define or throw them separately.
*
* @param job The event job to execute.
* @param dataContext The data context of the event job. if you need to pass parameters to the event job,
* you can use it.
* @return The result of the event job execution.
*/
ExecutorResult execute(Job job) throws JobException;
ExecutorResult<T> execute(Job job, C dataContext) throws JobException;
}

View File

@ -17,9 +17,6 @@
package org.apache.doris.scheduler.executor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
@ -32,15 +29,15 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
import java.util.Map;
/**
* we use this executor to execute sql job
*/
@Getter
@Slf4j
public class SqlJobExecutor implements JobExecutor {
public class SqlJobExecutor extends AbstractJobExecutor<String, Map<String, Object>> {
@Getter
@Setter
@SerializedName(value = "sql")
private String sql;
@ -50,24 +47,14 @@ public class SqlJobExecutor implements JobExecutor {
}
@Override
public ExecutorResult<String> execute(Job job) throws JobException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
ctx.setDatabase(job.getDbName());
ctx.setQualifiedUser(job.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
ctx.getState().reset();
ctx.setThreadLocalInfo();
String taskIdString = UUID.randomUUID().toString();
UUID taskId = UUID.fromString(taskIdString);
TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
ctx.setQueryId(queryId);
public ExecutorResult<String> execute(Job job, Map<String, Object> dataContext) throws JobException {
ConnectContext ctx = createContext(job);
String taskIdString = generateTaskId();
TUniqueId queryId = generateQueryId(taskIdString);
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
String result = convertExecuteResult(ctx, taskIdString);
return new ExecutorResult<>(result, true, null, sql);
} catch (Exception e) {
log.warn("execute sql job failed, job id :{}, sql: {}, error: {}", job.getJobId(), sql, e);
@ -88,4 +75,5 @@ public class SqlJobExecutor implements JobExecutor {
return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: "
+ ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage();
}
}

View File

@ -80,6 +80,9 @@ public class Job implements Writable {
@SerializedName("jobStatus")
private JobStatus jobStatus;
@SerializedName("jobType")
private JobType jobType = JobType.RECURRING;
/**
* The executor of the job.
*
@ -93,12 +96,6 @@ public class Job implements Writable {
@SerializedName("user")
private String user;
@SerializedName("isCycleJob")
private boolean isCycleJob = false;
@SerializedName("isStreamingJob")
private boolean isStreamingJob = false;
@SerializedName("intervalMs")
private Long intervalMs = 0L;
@SerializedName("startTimeMs")
@ -129,6 +126,8 @@ public class Job implements Writable {
@SerializedName("createTimeMs")
private Long createTimeMs = System.currentTimeMillis();
private Boolean lastExecuteTaskStatus;
@SerializedName("comment")
private String comment;
@ -206,6 +205,18 @@ public class Job implements Writable {
}
public void checkJobParam() throws DdlException {
if (null == jobCategory) {
throw new DdlException("jobCategory must be set");
}
if (null == executor) {
throw new DdlException("Job executor must be set");
}
if (null == jobType) {
throw new DdlException("Job type must be set");
}
if (jobType.equals(JobType.MANUAL)) {
return;
}
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
throw new DdlException("startTimeMs must be greater than current time");
}
@ -221,15 +232,10 @@ public class Job implements Writable {
if (null != intervalUnit && null != originInterval) {
this.intervalMs = intervalUnit.getParameterValue(originInterval);
}
if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
if (jobType.equals(JobType.RECURRING) && (intervalMs == null || intervalMs <= 0L)) {
throw new DdlException("cycle job must set intervalMs");
}
if (null == jobCategory) {
throw new DdlException("jobCategory must be set");
}
if (null == executor) {
throw new DdlException("Job executor must be set");
}
}
@ -246,33 +252,41 @@ public class Job implements Writable {
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(dbName);
if (jobCategory.equals(JobCategory.MTMV)) {
row.add(baseName);
}
row.add(jobName);
row.add(user);
row.add(timezone);
if (isCycleJob) {
row.add(JobType.RECURRING.name());
} else {
if (isStreamingJob) {
row.add(JobType.STREAMING.name());
} else {
row.add(JobType.ONE_TIME.name());
}
}
row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs));
row.add(isCycleJob ? originInterval.toString() : "null");
row.add(isCycleJob ? intervalUnit.name() : "null");
row.add(isCycleJob && startTimeMs > 0 ? TimeUtils.longToTimeString(startTimeMs) : "null");
row.add(isCycleJob && endTimeMs > 0 ? TimeUtils.longToTimeString(endTimeMs) : "null");
row.add(jobType.name());
row.add(convertRecurringStrategyToString());
row.add(jobStatus.name());
row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
row.add(errMsg == null ? "null" : errMsg);
row.add(null == lastExecuteTaskStatus ? "null" : lastExecuteTaskStatus.toString());
row.add(createTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(createTimeMs));
row.add(comment == null ? "null" : comment);
return row;
}
private String convertRecurringStrategyToString() {
if (jobType.equals(JobType.MANUAL)) {
return "MANUAL TRIGGER";
}
switch (jobType) {
case ONE_TIME:
return "AT " + TimeUtils.longToTimeString(startTimeMs);
case RECURRING:
String result = "EVERY " + originInterval + " " + intervalUnit.name();
if (startTimeMs > 0) {
result += " STARTS " + TimeUtils.longToTimeString(startTimeMs);
}
if (endTimeMs > 0) {
result += " ENDS " + TimeUtils.longToTimeString(endTimeMs);
}
return result;
case STREAMING:
return "STREAMING" + (startTimeMs > 0 ? " AT " + TimeUtils.longToTimeString(startTimeMs) : "");
case MANUAL:
return "MANUAL TRIGGER";
default:
return "UNKNOWN";
}
}
}

View File

@ -21,10 +21,12 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.constants.TaskType;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInput;
import java.io.DataOutput;
@ -32,7 +34,8 @@ import java.io.IOException;
import java.util.List;
@Data
public class JobTask implements Writable {
@Slf4j
public class JobTask<T> implements Writable {
@SerializedName("jobId")
private Long jobId;
@ -54,6 +57,21 @@ public class JobTask implements Writable {
@SerializedName("errorMsg")
private String errorMsg;
@SerializedName("contextDataStr")
private String contextDataStr;
@SerializedName("taskType")
private TaskType taskType = TaskType.SCHEDULER_JOB_TASK;
/**
* Some parameters specific to the current task that need to be used to execute the task
* eg: sql task, sql it's: select * from table where id = 1 order by id desc limit ${limit} offset ${offset}
* contextData is a map, key1 is limit, value is 10,key2 is offset, value is 1
* when execute the task, we will replace the ${limit} to 10, ${offset} to 1
* so to execute sql is: select * from table where id = 1 order by id desc limit 10 offset 1.
*/
private T contextData;
public JobTask(Long jobId, Long taskId, Long createTimeMs) {
//it's enough to use nanoTime to identify a task
this.taskId = taskId;
@ -61,10 +79,22 @@ public class JobTask implements Writable {
this.createTimeMs = createTimeMs;
}
public List<String> getShowInfo() {
public JobTask(Long jobId, Long taskId, Long createTimeMs, T contextData) {
this(jobId, taskId, createTimeMs);
this.contextData = contextData;
try {
this.contextDataStr = GsonUtils.GSON.toJson(contextData);
} catch (Exception e) {
this.contextDataStr = null;
log.error("contextData serialize failed, jobId: {}, taskId: {}", jobId, taskId, e);
}
}
public List<String> getShowInfo(String jobName) {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(String.valueOf(taskId));
row.add(String.valueOf(jobId));
row.add(jobName);
if (null != createTimeMs) {
row.add(TimeUtils.longToTimeString(createTimeMs));
}
@ -90,6 +120,7 @@ public class JobTask implements Writable {
} else {
row.add(errorMsg);
}
row.add(taskType.name());
return row;
}

View File

@ -51,17 +51,20 @@ public class JobTaskManager implements Writable {
* used to record the start time of the task to be executed
* will clear when the task is executed
*/
private static ConcurrentHashMap<Long, Map<Long, Long>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
private static ConcurrentHashMap<Long, Map<Long, JobTask>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long startTime) {
public static void addPrepareTask(JobTask jobTask) {
long jobId = jobTask.getJobId();
long taskId = jobTask.getTaskId();
prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
prepareTaskCreateMsMap.get(jobId).put(taskId, startTime);
prepareTaskCreateMsMap.get(jobId).put(taskId, jobTask);
}
public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) {
if (!prepareTaskCreateMsMap.containsKey(jobId)) {
// if the job is not in the map, return current time
return System.currentTimeMillis();
public static JobTask pollPrepareTaskByTaskId(Long jobId, Long taskId) {
if (!prepareTaskCreateMsMap.containsKey(jobId) || !prepareTaskCreateMsMap.get(jobId).containsKey(taskId)) {
// if the job is not in the map, return new JobTask
// return new JobTask(jobId, taskId, System.currentTimeMillis()); fixme
return null;
}
return prepareTaskCreateMsMap.get(jobId).remove(taskId);
}

View File

@ -19,7 +19,6 @@ package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Writable;
@ -28,8 +27,11 @@ import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.job.TimerJobTask;
import io.netty.util.HashedWheelTimer;
@ -88,13 +90,12 @@ public class TimerJobManager implements Closeable, Writable {
}
public void start() {
dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"),
1, TimeUnit.SECONDS, 660);
dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
dorisTimer.start();
Long currentTimeMs = System.currentTimeMillis();
jobMap.forEach((jobId, job) -> {
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
});
batchSchedulerTasks();
@ -152,18 +153,18 @@ public class TimerJobManager implements Closeable, Writable {
}
private void initAndSchedulerJob(Job job) {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
if (!job.getJobStatus().equals(JobStatus.RUNNING) || job.getJobType().equals(JobType.MANUAL)) {
return;
}
Long currentTimeMs = System.currentTimeMillis();
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
lastBatchSchedulerTimestamp,
job.getNextExecuteTimeMs());
job.getNextExecuteTimeMs(), job.getJobType());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
putOneTask(job.getJobId(), timestamp);
@ -172,7 +173,7 @@ public class TimerJobManager implements Closeable, Writable {
}
}
private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) {
private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, JobType jobType) {
// if job not delay, first execute time is start time
if (startTimeMs != 0L && startTimeMs > currentTimeMs) {
return startTimeMs;
@ -182,13 +183,30 @@ public class TimerJobManager implements Closeable, Writable {
return currentTimeMs;
}
// if it's cycle job and not set start tine, first execute time is current time + interval
if (isCycleJob && startTimeMs == 0L) {
if (jobType.equals(JobType.RECURRING) && startTimeMs == 0L) {
return currentTimeMs + intervalMs;
}
// if it's not cycle job and already delay, first execute time is current time
return currentTimeMs;
}
public <T> boolean immediateExecuteTask(Long jobId, T taskContextData) throws DdlException {
Job job = jobMap.get(jobId);
if (job == null) {
log.warn("immediateExecuteTask failed, jobId: {} not exist", jobId);
return false;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
log.warn("immediateExecuteTask failed, jobId: {} is not running", jobId);
return false;
}
JobTask jobTask = createInitialTask(jobId, taskContextData);
jobTask.setTaskType(TaskType.MANUAL_JOB_TASK);
JobTaskManager.addPrepareTask(jobTask);
disruptor.tryPublish(jobId, jobTask.getTaskId(), TaskType.MANUAL_JOB_TASK);
return true;
}
public void unregisterJob(Long jobId) {
jobMap.remove(jobId);
}
@ -204,6 +222,14 @@ public class TimerJobManager implements Closeable, Writable {
pauseJob(job);
}
public void setJobLatestStatus(long jobId, boolean status) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("pauseJob failed, jobId: {} not exist", jobId);
}
job.setLastExecuteTaskStatus(status);
}
public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
@ -326,14 +352,14 @@ public class TimerJobManager implements Closeable, Writable {
executeJobIdsWithinLastTenMinutesWindow();
}
private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) {
private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime, JobType jobType) {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
if (!jobType.equals(JobType.RECURRING) && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
return jobExecuteTimes;
}
if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) {
if (jobType.equals(JobType.RECURRING) && (nextExecuteTime > endTimeEndWindow)) {
return new ArrayList<>();
}
while (endTimeEndWindow >= nextExecuteTime) {
@ -360,11 +386,11 @@ public class TimerJobManager implements Closeable, Writable {
return;
}
jobMap.forEach((k, v) -> {
if (v.isRunning() && (v.getNextExecuteTimeMs()
if (!v.getJobType().equals(JobType.MANUAL) && v.isRunning() && (v.getNextExecuteTimeMs()
+ v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
List<Long> executeTimes = findTasksBetweenTime(
v, lastBatchSchedulerTimestamp,
v.getNextExecuteTimeMs());
v.getNextExecuteTimeMs(), v.getJobType());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
putOneTask(v.getJobId(), executeTime);
@ -402,7 +428,8 @@ public class TimerJobManager implements Closeable, Writable {
log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId);
return;
}
long taskId = System.nanoTime();
JobTask jobTask = createAsyncInitialTask(jobId, startExecuteTime);
long taskId = jobTask.getTaskId();
TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor);
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
@ -412,13 +439,13 @@ public class TimerJobManager implements Closeable, Writable {
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime);
JobTaskManager.addPrepareTask(jobTask);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime);
JobTaskManager.addPrepareTask(jobTask);
}
// cancel all task for one job
@ -488,9 +515,19 @@ public class TimerJobManager implements Closeable, Writable {
}
public void putOneJobToQueen(Long jobId) {
JobTask jobTask = createInitialTask(jobId, null);
JobTaskManager.addPrepareTask(jobTask);
disruptor.tryPublish(jobId, jobTask.getTaskId());
}
private JobTask createAsyncInitialTask(long jobId, long createTimeMs) {
long taskId = System.nanoTime();
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis());
disruptor.tryPublish(jobId, taskId);
return new JobTask(jobId, taskId, createTimeMs);
}
private <T> JobTask createInitialTask(long jobId, T context) {
long taskId = System.nanoTime();
return new JobTask(jobId, taskId, System.currentTimeMillis(), context);
}
@Override

View File

@ -110,6 +110,19 @@ public interface PersistentJobRegister {
Long registerJob(Job job) throws DdlException;
/**
* execute job task immediately,this method will not change job status and don't affect scheduler job
* this task type should set to {@link org.apache.doris.scheduler.constants.TaskType#MANUAL_JOB_TASK}
*
* @param jobId job id
* @param contextData if you need to pass parameters to the task,
* @param <T> context data type
* @return true if execute success, false if execute failed,
* if job is not exist or job is not running, or job not support manual execute, return false
*/
<T> boolean immediateExecuteTask(Long jobId, T contextData) throws DdlException;
List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher);
/**

View File

@ -68,6 +68,11 @@ public class TimerJobRegister implements PersistentJobRegister {
return timerJobManager.registerJob(job);
}
@Override
public <T> boolean immediateExecuteTask(Long jobId, T data) throws DdlException {
return timerJobManager.immediateExecuteTask(jobId, data);
}
@Override
public void pauseJob(Long jobId) {
timerJobManager.pauseJob(jobId);