[Improve](Job)Support other types of Job query interfaces (#24172)

- Support MTMV job
- Task info add create time and sql
- Optimize scheduling logic
This commit is contained in:
Calvin Kirs
2023-09-12 13:55:56 +08:00
committed by GitHub
parent dbf509edc0
commit 232f120edc
21 changed files with 263 additions and 94 deletions

View File

@ -2689,15 +2689,15 @@ resume_job_stmt ::=
show_job_stmt ::=
KW_SHOW KW_JOBS
{:
RESULT = new ShowJobStmt(null,null);
RESULT = new ShowJobStmt(null,null,null);
:}
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobStmt(jobLabel,null);
RESULT = new ShowJobStmt(null,jobLabel,null);
:}
| KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobTaskStmt(jobLabel);
RESULT = new ShowJobTaskStmt(null,jobLabel);
:}
;
pause_job_stmt ::=

View File

@ -26,18 +26,21 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
/**
* SHOW JOB [FOR JobName]
* eg: show event
* return all job in connection db
* return all job in connection db
* eg: show event for test
* return job named test in connection db
* return job named test in connection db
*/
public class ShowJobStmt extends ShowStmt {
@ -57,29 +60,32 @@ public class ShowJobStmt extends ShowStmt {
.add("Status")
.add("LastExecuteFinishTime")
.add("ErrorMsg")
.add("CreateTime")
.add("Comment")
.build();
private static final String MTMV_NAME_TITLE = "mtmv_name";
private static final String NAME_TITLE = "name";
private final LabelName labelName;
@Getter
private String dbFullName; // optional
@Getter
private JobCategory jobCategory; // optional
private String jobCategoryName; // optional
@Getter
private String name; // optional
@Getter
private String pattern; // optional
public ShowJobStmt(LabelName labelName, String pattern) {
public ShowJobStmt(String category, LabelName labelName, String pattern) {
this.labelName = labelName;
this.pattern = pattern;
}
public String getDbFullName() {
return dbFullName;
}
public String getName() {
return name;
}
public String getPattern() {
return pattern;
this.jobCategoryName = category;
}
@Override
@ -87,6 +93,11 @@ public class ShowJobStmt extends ShowStmt {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
private void checkAuth() throws AnalysisException {
@ -118,6 +129,9 @@ public class ShowJobStmt extends ShowStmt {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
if (this.jobCategory.equals(JobCategory.MTMV) && title.equals(NAME_TITLE)) {
builder.addColumn(new Column(MTMV_NAME_TITLE, ScalarType.createVarchar(30)));
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();

View File

@ -25,9 +25,12 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
@ -40,27 +43,30 @@ public class ShowJobTaskStmt extends ShowStmt {
new ImmutableList.Builder<String>()
.add("JobId")
.add("TaskId")
.add("CreateTime")
.add("StartTime")
.add("EndTime")
.add("Status")
.add("ExecuteSql")
.add("Result")
.add("ErrorMsg")
.build();
@Getter
private final LabelName labelName;
@Getter
private JobCategory jobCategory; // optional
private String jobCategoryName; // optional
@Getter
private String dbFullName; // optional
@Getter
private String name; // optional
public ShowJobTaskStmt(LabelName labelName) {
public ShowJobTaskStmt(String category, LabelName labelName) {
this.labelName = labelName;
}
public String getDbFullName() {
return dbFullName;
}
public String getName() {
return name;
this.jobCategoryName = category;
}
@Override
@ -68,6 +74,11 @@ public class ShowJobTaskStmt extends ShowStmt {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
checkLabelName(analyzer);
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
private void checkLabelName(Analyzer analyzer) throws AnalysisException {

View File

@ -1523,6 +1523,7 @@ public class Env {
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
timerJobManager.start();
// Alter
getAlterInstance().start();
// Consistency checker

View File

@ -193,7 +193,6 @@ import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
@ -1441,7 +1440,7 @@ public class ShowExecutor {
ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
List<Job> jobs = Env.getCurrentEnv().getJobRegister()
.getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), JobCategory.SQL,
.getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), showJobTaskStmt.getJobCategory(),
null);
if (CollectionUtils.isEmpty(jobs)) {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
@ -1470,7 +1469,7 @@ public class ShowExecutor {
CaseSensibility.JOB.getCaseSensibility());
}
jobList = Env.getCurrentEnv().getJobRegister()
.getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), JobCategory.SQL,
.getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), showJobStmt.getJobCategory(),
matcher);
if (jobList.isEmpty()) {

View File

@ -25,25 +25,26 @@ import lombok.Getter;
public enum JobCategory {
COMMON(1, "common"),
SQL(2, "sql"),
MTMV(3, "mtmv"),
;
@Getter
private int code;
@Getter
private String description;
private String name;
JobCategory(int code, String description) {
JobCategory(int code, String name) {
this.code = code;
this.description = description;
this.name = name;
}
public static JobCategory getJobCategoryByCode(int code) {
public static JobCategory getJobCategoryByName(String name) {
for (JobCategory jobCategory : JobCategory.values()) {
if (jobCategory.getCode() == code) {
if (jobCategory.name.equalsIgnoreCase(name)) {
return jobCategory;
}
}
return null;
throw new IllegalArgumentException("Unknown job category name: " + name);
}
}

View File

@ -23,8 +23,7 @@ import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.EventTranslatorThreeArg;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
@ -65,12 +64,13 @@ public class TaskDisruptor implements Closeable {
private boolean isClosed = false;
/**
* The default {@link EventTranslatorOneArg} to use for {@link #tryPublish(Long)}.
* The default {@link EventTranslatorThreeArg} to use for {@link #tryPublish(Long, Long)}.
* This is used to avoid creating a new object for each publish.
*/
private static final EventTranslatorTwoArg<TaskEvent, Long, TaskType> TRANSLATOR
= (event, sequence, jobId, taskType) -> {
private static final EventTranslatorThreeArg<TaskEvent, Long, Long, TaskType> TRANSLATOR
= (event, sequence, jobId, taskId, taskType) -> {
event.setId(jobId);
event.setTaskId(taskId);
event.setTaskType(taskType);
};
@ -89,15 +89,15 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a job to the disruptor.
*
* @param jobId job id
* @param jobId job id
*/
public void tryPublish(Long jobId) {
public void tryPublish(Long jobId, Long taskId) {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId);
return;
}
try {
disruptor.publishEvent(TRANSLATOR, jobId, TaskType.TimerJobTask);
disruptor.publishEvent(TRANSLATOR, jobId, taskId, TaskType.TimerJobTask);
} catch (Exception e) {
log.error("tryPublish failed, jobId: {}", jobId, e);
}
@ -106,7 +106,7 @@ public class TaskDisruptor implements Closeable {
/**
* Publishes a task to the disruptor.
*
* @param taskId task id
* @param taskId task id
*/
public void tryPublishTask(Long taskId) {
if (isClosed) {
@ -114,7 +114,7 @@ public class TaskDisruptor implements Closeable {
return;
}
try {
disruptor.publishEvent(TRANSLATOR, taskId, TaskType.TransientTask);
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TransientTask);
} catch (Exception e) {
log.error("tryPublish failed, taskId: {}", taskId, e);
}

View File

@ -37,6 +37,8 @@ public class TaskEvent {
*/
private Long id;
private Long taskId;
private TaskType taskType;
public static final EventFactory<TaskEvent> FACTORY = TaskEvent::new;

View File

@ -18,8 +18,10 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.manager.JobTaskManager;
@ -29,8 +31,6 @@ import org.apache.doris.scheduler.manager.TransientTaskManager;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
/**
* This class represents a work handler for processing event tasks consumed by a Disruptor.
* The work handler retrieves the associated event job and executes it if it is running.
@ -89,6 +89,8 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
@SuppressWarnings("checkstyle:UnusedLocalVariable")
public void onTimerJobTaskHandle(TaskEvent taskEvent) {
long jobId = taskEvent.getId();
long taskId = taskEvent.getTaskId();
long createTimeMs = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId);
Job job = timerJobManager.getJob(jobId);
if (job == null) {
log.info("job is null, jobId: {}", jobId);
@ -99,10 +101,11 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
return;
}
log.debug("job is running, eventJobId: {}", jobId);
JobTask jobTask = new JobTask(jobId);
JobTask jobTask = new JobTask(jobId, taskId, createTimeMs);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
Object result = job.getExecutor().execute(job);
ExecutorResult result = job.getExecutor().execute(job);
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.isCycleJob()) {
updateJobStatusIfPastEndTime(job);
@ -110,14 +113,27 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
// one time job should be finished after execute
updateOnceTimeJobStatus(job);
}
String resultStr = Objects.isNull(result) ? "" : result.toString();
if (null == result) {
log.warn("Job execute failed, jobId: {}, result is null", jobId);
jobTask.setErrorMsg("Job execute failed, result is null");
jobTask.setIsSuccessful(false);
timerJobManager.pauseJob(jobId);
return;
}
String resultStr = GsonUtils.GSON.toJson(result.getResult());
jobTask.setExecuteResult(resultStr);
jobTask.setIsSuccessful(true);
jobTask.setIsSuccessful(result.isSuccess());
if (!result.isSuccess()) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, result.getExecutorSql());
jobTask.setErrorMsg(result.getExecutorSql());
timerJobManager.pauseJob(jobId);
}
jobTask.setExecuteSql(result.getExecutorSql());
} catch (Exception e) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
job.pause(e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
timerJobManager.pauseJob(jobId);
}
jobTask.setEndTimeMs(System.currentTimeMillis());
if (null == jobTaskManager) {
@ -143,7 +159,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
private void updateJobStatusIfPastEndTime(Job job) {
if (job.isExpired()) {
job.finish();
timerJobManager.finishJob(job.getJobId());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.scheduler.executor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
/**
@ -39,6 +40,6 @@ public interface JobExecutor<T> {
*
* @return The result of the event job execution.
*/
T execute(Job job) throws JobException;
ExecutorResult execute(Job job) throws JobException;
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.thrift.TUniqueId;
@ -36,10 +37,9 @@ import java.util.UUID;
/**
* we use this executor to execute sql job
*
* @param <QueryState> the state of sql job, we can record the state of sql job
*/
@Slf4j
public class SqlJobExecutor<QueryState> implements JobExecutor {
public class SqlJobExecutor implements JobExecutor {
@Getter
@Setter
@ -51,7 +51,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor {
}
@Override
public String execute(Job job) throws JobException {
public ExecutorResult<String> execute(Job job) throws JobException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
@ -67,9 +67,12 @@ public class SqlJobExecutor<QueryState> implements JobExecutor {
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
return convertExecuteResult(ctx, taskIdString);
String result = convertExecuteResult(ctx, taskIdString);
return new ExecutorResult<>(result, true, null, sql);
} catch (Exception e) {
throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage());
log.warn("execute sql job failed, sql: {}, error: {}", sql, e.getMessage());
return new ExecutorResult<>(null, false, e.getMessage(), sql);
}
}
@ -82,6 +85,7 @@ public class SqlJobExecutor<QueryState> implements JobExecutor {
throw new JobException("error code: " + ctx.getState().getErrorCode() + ", error msg: "
+ ctx.getState().getErrorMessage());
}
return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: "
+ ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage();
}

View File

@ -22,7 +22,7 @@ import org.apache.doris.scheduler.exception.JobException;
/**
* A functional interface for executing a memory task.
*/
public interface TransientTaskExecutor<T> {
public interface TransientTaskExecutor {
/**
* Executes the memory task.

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class ExecutorResult<T> {
private T result;
private boolean success;
private String errorMsg;
private String executorSql;
}

View File

@ -87,6 +87,8 @@ public class Job implements Writable {
*/
@SerializedName("executor")
private JobExecutor executor;
@SerializedName("baseName")
private String baseName;
@SerializedName("user")
private String user;
@ -124,6 +126,9 @@ public class Job implements Writable {
@SerializedName("nextExecuteTimeMs")
private Long nextExecuteTimeMs = 0L;
@SerializedName("createTimeMs")
private Long createTimeMs = System.currentTimeMillis();
@SerializedName("comment")
private String comment;
@ -209,7 +214,9 @@ public class Job implements Writable {
if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
throw new DdlException("endTimeMs must be greater than current time");
}
if (null != intervalUnit && null != originInterval) {
this.intervalMs = intervalUnit.getParameterValue(originInterval);
}
if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
throw new DdlException("cycle job must set intervalMs");
}
@ -236,6 +243,9 @@ public class Job implements Writable {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(dbName);
if (jobCategory.equals(JobCategory.MTMV)) {
row.add(baseName);
}
row.add(jobName);
row.add(user);
row.add(timezone);
@ -256,6 +266,7 @@ public class Job implements Writable {
row.add(jobStatus.name());
row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
row.add(errMsg == null ? "null" : errMsg);
row.add(createTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(createTimeMs));
row.add(comment == null ? "null" : comment);
return row;
}

View File

@ -38,27 +38,36 @@ public class JobTask implements Writable {
private Long jobId;
@SerializedName("taskId")
private Long taskId;
@SerializedName("createTimeMs")
private Long createTimeMs;
@SerializedName("startTimeMs")
private Long startTimeMs;
@SerializedName("endTimeMs")
private Long endTimeMs;
@SerializedName("successful")
private Boolean isSuccessful;
@SerializedName("executeSql")
private String executeSql;
@SerializedName("executeResult")
private String executeResult;
@SerializedName("errorMsg")
private String errorMsg;
public JobTask(Long jobId) {
public JobTask(Long jobId, Long taskId, Long createTimeMs) {
//it's enough to use nanoTime to identify a task
this.taskId = System.nanoTime();
this.taskId = taskId;
this.jobId = jobId;
this.createTimeMs = createTimeMs;
}
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(String.valueOf(taskId));
if (null != createTimeMs) {
row.add(TimeUtils.longToTimeString(createTimeMs));
}
row.add(TimeUtils.longToTimeString(startTimeMs));
row.add(null == endTimeMs ? "null" : TimeUtils.longToTimeString(endTimeMs));
if (endTimeMs == null) {
@ -66,6 +75,11 @@ public class JobTask implements Writable {
} else {
row.add(isSuccessful ? "SUCCESS" : "FAILED");
}
if (null == executeSql) {
row.add("null");
} else {
row.add(executeSql);
}
if (null == executeResult) {
row.add("null");
} else {

View File

@ -23,8 +23,6 @@ import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.Getter;
import java.util.UUID;
/**
* This class represents a timer task that can be scheduled by a Netty timer.
* When the timer task is triggered, it produces a Job task using the Disruptor.
@ -36,16 +34,17 @@ public class TimerJobTask implements TimerTask {
private final Long jobId;
// more fields should be added here and record in feature
private final Long taskId = UUID.randomUUID().getMostSignificantBits();
private final Long taskId;
private final Long startTimestamp;
private final TaskDisruptor taskDisruptor;
public TimerJobTask(Long jobId, Long startTimestamp, TaskDisruptor taskDisruptor) {
public TimerJobTask(Long jobId, Long taskId, Long startTimestamp, TaskDisruptor taskDisruptor) {
this.jobId = jobId;
this.startTimestamp = startTimestamp;
this.taskDisruptor = taskDisruptor;
this.taskId = taskId;
}
@Override
@ -53,6 +52,6 @@ public class TimerJobTask implements TimerTask {
if (timeout.isCancelled()) {
return;
}
taskDisruptor.tryPublish(jobId);
taskDisruptor.tryPublish(jobId, taskId);
}
}

View File

@ -31,6 +31,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -44,6 +45,31 @@ public class JobTaskManager implements Writable {
private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16);
/**
* taskId -> startTime
* used to record the start time of the task to be executed
* will clear when the task is executed
*/
private static ConcurrentHashMap<Long, Map<Long, Long>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
public static void addPrepareTaskStartTime(Long jobId, Long taskId, Long startTime) {
prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
prepareTaskCreateMsMap.get(jobId).put(taskId, startTime);
}
public static Long pollPrepareTaskByTaskId(Long jobId, Long taskId) {
if (!prepareTaskCreateMsMap.containsKey(jobId)) {
// if the job is not in the map, return current time
return System.currentTimeMillis();
}
return prepareTaskCreateMsMap.get(jobId).remove(taskId);
}
public static void clearPrepareTaskByJobId(Long jobId) {
prepareTaskCreateMsMap.remove(jobId);
}
public void addJobTask(JobTask jobTask) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>());

View File

@ -23,6 +23,7 @@ import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
@ -50,9 +51,7 @@ import java.util.concurrent.TimeUnit;
public class TimerJobManager implements Closeable, Writable {
private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128);
private long lastBatchSchedulerTimestamp;
private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
/**
@ -72,7 +71,7 @@ public class TimerJobManager implements Closeable, Writable {
/**
* scheduler tasks, it's used to scheduler job
*/
private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
private HashedWheelTimer dorisTimer;
/**
* Producer and Consumer model
@ -83,8 +82,18 @@ public class TimerJobManager implements Closeable, Writable {
private TaskDisruptor disruptor;
public TimerJobManager() {
dorisTimer.start();
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
public void start() {
dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
dorisTimer.start();
Long currentTimeMs = System.currentTimeMillis();
jobMap.forEach((jobId, job) -> {
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
});
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}
@ -99,6 +108,9 @@ public class TimerJobManager implements Closeable, Writable {
}
public void replayCreateJob(Job job) {
if (jobMap.containsKey(job.getJobId())) {
return;
}
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
@ -252,6 +264,20 @@ public class TimerJobManager implements Closeable, Writable {
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
public void finishJob(long jobId) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("update job status failed, jobId: {} not exist", jobId);
}
if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) {
return;
}
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.FINISHED);
jobMap.get(job.getJobId()).finish();
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
private Optional<Job> findJob(String dbName, String jobName, JobCategory jobCategory) {
return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst();
}
@ -297,11 +323,15 @@ public class TimerJobManager implements Closeable, Writable {
}
private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
return jobExecuteTimes;
}
if (job.isCycleJob() && (nextExecuteTime > endTimeEndWindow)) {
return new ArrayList<>();
}
while (endTimeEndWindow >= nextExecuteTime) {
if (job.isTaskTimeExceeded()) {
break;
@ -345,7 +375,12 @@ public class TimerJobManager implements Closeable, Writable {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
dorisTimer.newTimeout(timeout -> {
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
/**
@ -358,11 +393,12 @@ public class TimerJobManager implements Closeable, Writable {
* delay seconds, we just can be second precision
*/
public void putOneTask(Long jobId, Long startExecuteTime) {
TimerJobTask task = new TimerJobTask(jobId, startExecuteTime, disruptor);
if (isClosed) {
log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId());
log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId);
return;
}
long taskId = System.nanoTime();
TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor);
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
if (timeout == null) {
@ -371,11 +407,13 @@ public class TimerJobManager implements Closeable, Writable {
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, startExecuteTime);
}
// cancel all task for one job
@ -390,14 +428,7 @@ public class TimerJobManager implements Closeable, Writable {
timeout.cancel();
}
});
}
public void stopTask(Long jobId, Long taskId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
cancelJobAllTask(jobId);
jobTimeoutMap.get(jobId).remove(taskId);
JobTaskManager.clearPrepareTaskByJobId(jobId);
}
// get delay time, if startTimestamp is less than now, return 0
@ -452,7 +483,9 @@ public class TimerJobManager implements Closeable, Writable {
}
public void putOneJobToQueen(Long jobId) {
disruptor.tryPublish(jobId);
long taskId = System.nanoTime();
JobTaskManager.addPrepareTaskStartTime(jobId, taskId, System.currentTimeMillis());
disruptor.tryPublish(jobId, taskId);
}
@Override
@ -473,8 +506,7 @@ public class TimerJobManager implements Closeable, Writable {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Job job = Job.readFields(in);
jobMap.put(job.getJobId(), job);
initAndSchedulerJob(job);
jobMap.putIfAbsent(job.getJobId(), job);
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@ -64,7 +65,7 @@ public class TaskDisruptorTest {
timerJobManager.getJob(anyLong);
result = job;
}};
taskDisruptor.tryPublish(job.getJobId());
taskDisruptor.tryPublish(job.getJobId(), 1L);
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag);
Assertions.assertTrue(testEventExecuteFlag);
}
@ -72,9 +73,9 @@ public class TaskDisruptorTest {
class TestExecutor implements JobExecutor<Boolean> {
@Override
public Boolean execute(Job job) {
public ExecutorResult execute(Job job) {
testEventExecuteFlag = true;
return true;
return new ExecutorResult(true, true, null, "null");
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@ -60,6 +61,7 @@ public class TimerJobManagerTest {
TransientTaskManager transientTaskManager = new TransientTaskManager();
TaskDisruptor taskDisruptor = new TaskDisruptor(this.timerJobManager, transientTaskManager);
this.timerJobManager.setDisruptor(taskDisruptor);
timerJobManager.start();
}
@Test
@ -166,9 +168,9 @@ public class TimerJobManagerTest {
class TestExecutor implements JobExecutor<Boolean> {
@Override
public Boolean execute(Job job) {
public ExecutorResult execute(Job job) {
log.info("test execute count:{}", testExecuteCount.incrementAndGet());
return true;
return new ExecutorResult<>(true, true, null, "");
}
}
}