[refactor](Job)Refactor JOB (#26845)

##  Motivation:
In the past, our JOB only supported Timer JOB, which could only provide scheduling for fixed-time tasks. Meanwhile, the JOB was solely responsible for execution, and during execution, there might be inconsistencies in states, where the task was executed successfully, but the JOB's recorded task status was not updated.
This inconsistency in task states recorded by the JOB could not guarantee the correctness of the JOB's status. With the gradual integration of various businesses into the JOB, such as the export job and mtmv job, we found that scaling became difficult, and the JOB became particularly bulky. Hence, we have decided to refactor JOB.

## Refactoring Goals:
- Provide a unified external registration interface so that all JOBs can be registered through this interface and be scheduled by the JobScheduler.

- The JobScheduler can schedule instant JOBs, timer JOBs, and manual JOBs.

- JOB should provide a unified external extension class. All JOBs can be extended through this extension class, which can provide special functionalities like JOB status restoration, Task execution, etc.

- Extended JOBs should manage task states on their own to avoid inconsistent state maintenance issues.

- Different JOBs should use their own thread pools for processing to prevent inter-JOB interference.
###  Design:
- The JOBManager provides a unified registration interface through which all JOBs can register and then be scheduled by the JobScheduler.
- The TimerJob periodically fetches JOBs that need to be scheduled within a time window and hands them over to the Time Wheel for triggering. To prevent excessive tasks in the Time Wheel, it distributes the tasks to the dispatch thread pool, which then assigns them to corresponding thread pools for execution.
- ManualJob or Instant Job directly assigns tasks to the corresponding thread pool for execution.
- The JOB provides a unified extension class that all JOBs can utilize for extension, providing special functionalities like JOB status restoration, Task execution, etc.
- To implement a new JOB, one only needs to implement AbstractJob.class and AbstractTask.class.
<img width="926" alt="image" src="https://github.com/apache/doris/assets/16631152/3032e05d-133e-425b-b31e-4bb492f06ddc">

## NOTICE
This will cause the master's metadata to be incompatible
This commit is contained in:
Calvin Kirs
2023-11-14 18:18:59 +08:00
committed by GitHub
parent 6b8ec22436
commit 13bc6b702b
60 changed files with 2256 additions and 2452 deletions

View File

@ -23,14 +23,15 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
@ -61,12 +62,11 @@ import java.util.HashSet;
@Slf4j
public class CreateJobStmt extends DdlStmt {
@Getter
private StatementBase doStmt;
@Getter
private StatementBase stmt;
@Getter
private Job job;
private AbstractJob<?> jobInstance;
private final LabelName labelName;
@ -81,6 +81,7 @@ public class CreateJobStmt extends DdlStmt {
private final String endsTimeStamp;
private final String comment;
private JobExecuteType executeType;
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
@ -88,9 +89,9 @@ public class CreateJobStmt extends DdlStmt {
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();
private static HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);
private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);
public CreateJobStmt(LabelName labelName, String jobTypeName, String onceJobStartTimestamp,
public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
@ -100,10 +101,78 @@ public class CreateJobStmt extends DdlStmt {
this.startsTimeStamp = startsTimeStamp;
this.endsTimeStamp = endsTimeStamp;
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
JobType jobType = JobType.valueOf(jobTypeName.toUpperCase());
job.setJobType(jobType);
this.doStmt = doStmt;
this.executeType = executeType;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo used InsertIntoCommand if job is InsertJob
InsertJob job = new InsertJob();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setExecuteType(executeType);
job.setCreateTimeMs(System.currentTimeMillis());
TimerDefinition timerDefinition = new TimerDefinition();
if (null != onceJobStartTimestamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(onceJobStartTimestamp));
}
if (null != interval) {
timerDefinition.setInterval(interval);
}
if (null != intervalTimeUnit) {
timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()));
}
if (null != startsTimeStamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
}
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);
job.setComment(comment);
job.setCurrentDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
job.setCreateUser(ConnectContext.get().getCurrentUserIdentity());
job.setJobStatus(JobStatus.RUNNING);
job.checkJobParams();
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
job.setExecuteSql(executeSql);
jobInstance = job;
}
protected static void checkAuth() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
private void checkStmtSupport() throws AnalysisException {
if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
return;
}
for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
if (clazz.isAssignableFrom(doStmt.getClass())) {
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
return;
}
}
throw new AnalysisException("Not support this stmt type");
}
private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
doStmt.analyze(analyzer);
}
private String parseExecuteSql(String sql) throws AnalysisException {
@ -115,111 +184,4 @@ public class CreateJobStmt extends DdlStmt {
}
return executeSql;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
job.setDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
if (StringUtils.isNotBlank(onceJobStartTimestamp)) {
analyzerOnceTimeJob();
} else {
analyzerCycleJob();
}
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
timezone = TimeUtils.checkTimeZoneValidAndStandardize(timezone);
job.setTimezone(timezone);
job.setComment(comment);
//todo support user define
job.setUser(ConnectContext.get().getQualifiedUser());
job.setJobStatus(JobStatus.RUNNING);
job.setJobCategory(JobCategory.SQL);
analyzerSqlStmt();
}
protected static void checkAuth() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
}
private void checkStmtSupport() throws AnalysisException {
if (supportStmtClassNamesCache.contains(stmt.getClass().getSimpleName())) {
return;
}
for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
if (clazz.isAssignableFrom(stmt.getClass())) {
supportStmtClassNamesCache.add(stmt.getClass().getSimpleName());
return;
}
}
throw new AnalysisException("Not support this stmt type");
}
private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
stmt.analyze(analyzer);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
SqlJobExecutor sqlJobExecutor = new SqlJobExecutor(executeSql);
job.setExecutor(sqlJobExecutor);
}
private void analyzerCycleJob() throws UserException {
if (null == interval) {
throw new AnalysisException("interval is null");
}
if (interval <= 0) {
throw new AnalysisException("interval must be greater than 0");
}
if (StringUtils.isBlank(intervalTimeUnit)) {
throw new AnalysisException("intervalTimeUnit is null");
}
try {
IntervalUnit intervalUnit = IntervalUnit.valueOf(intervalTimeUnit.toUpperCase());
job.setIntervalUnit(intervalUnit);
long intervalTimeMs = intervalUnit.getParameterValue(interval);
job.setIntervalMs(intervalTimeMs);
job.setOriginInterval(interval);
} catch (IllegalArgumentException e) {
throw new AnalysisException("interval time unit is not valid, we only support second,minute,hour,day,week");
}
if (StringUtils.isNotBlank(startsTimeStamp)) {
long startsTimeMillis = TimeUtils.timeStringToLong(startsTimeStamp);
if (startsTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("starts time must be greater than current time");
}
job.setStartTimeMs(startsTimeMillis);
}
if (StringUtils.isNotBlank(endsTimeStamp)) {
long endTimeMillis = TimeUtils.timeStringToLong(endsTimeStamp);
if (endTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("ends time must be greater than current time");
}
job.setEndTimeMs(endTimeMillis);
}
if (job.getStartTimeMs() > 0 && job.getEndTimeMs() > 0
&& (job.getEndTimeMs() - job.getStartTimeMs() < job.getIntervalMs())) {
throw new AnalysisException("ends time must be greater than start time and interval time");
}
}
private void analyzerOnceTimeJob() throws UserException {
job.setIntervalMs(0L);
long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp);
if (executeAtTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("job time stamp must be greater than current time");
}
job.setStartTimeMs(executeAtTimeMillis);
}
}

View File

@ -26,12 +26,10 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
@ -52,7 +50,7 @@ public class ShowJobStmt extends ShowStmt {
.add("ExecuteType")
.add("RecurringStrategy")
.add("Status")
.add("lastExecuteTaskStatus")
.add("ExecuteSql")
.add("CreateTime")
.add("Comment")
.build();
@ -65,9 +63,6 @@ public class ShowJobStmt extends ShowStmt {
@Getter
private String dbFullName; // optional
@Getter
private JobCategory jobCategory; // optional
private String jobCategoryName; // optional
@Getter
@ -86,11 +81,6 @@ public class ShowJobStmt extends ShowStmt {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
private void checkAuth() throws AnalysisException {
@ -122,9 +112,6 @@ public class ShowJobStmt extends ShowStmt {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
if (this.jobCategory.equals(JobCategory.MTMV) && title.equals(NAME_TITLE)) {
builder.addColumn(new Column(MTMV_NAME_TITLE, ScalarType.createVarchar(30)));
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();

View File

@ -25,12 +25,10 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.scheduler.constants.JobCategory;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
@ -57,8 +55,6 @@ public class ShowJobTaskStmt extends ShowStmt {
@Getter
private final LabelName labelName;
@Getter
private JobCategory jobCategory; // optional
@Getter
private String dbFullName; // optional
@ -67,12 +63,6 @@ public class ShowJobTaskStmt extends ShowStmt {
public ShowJobTaskStmt(String category, LabelName labelName) {
this.labelName = labelName;
String jobCategoryName = category;
if (StringUtils.isBlank(jobCategoryName)) {
this.jobCategory = JobCategory.SQL;
} else {
this.jobCategory = JobCategory.valueOf(jobCategoryName.toUpperCase());
}
}
@Override
@ -114,9 +104,6 @@ public class ShowJobTaskStmt extends ShowStmt {
@Override
public RedirectStatus getRedirectStatus() {
if (jobCategory.isPersistent()) {
return RedirectStatus.FORWARD_NO_SYNC;
}
return RedirectStatus.NO_FORWARD;
}
}

View File

@ -147,6 +147,8 @@ import org.apache.doris.ha.MasterInfo;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.meta.MetaBaseAction;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.manager.JobManager;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
@ -217,13 +219,8 @@ import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.manager.JobTaskManager;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.scheduler.registry.PersistentJobRegister;
import org.apache.doris.scheduler.registry.TimerJobRegister;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
@ -349,13 +346,10 @@ public class Env {
private CooldownConfHandler cooldownConfHandler;
private MetastoreEventsProcessor metastoreEventsProcessor;
private PersistentJobRegister persistentJobRegister;
private ExportTaskRegister exportTaskRegister;
private TimerJobManager timerJobManager;
private JobManager<? extends AbstractJob> jobManager;
private TransientTaskManager transientTaskManager;
private JobTaskManager jobTaskManager;
private TaskDisruptor taskDisruptor;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon feDiskUpdater; // Update fe disk info
@ -625,13 +619,8 @@ public class Env {
this.cooldownConfHandler = new CooldownConfHandler();
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobTaskManager = new JobTaskManager();
this.timerJobManager = new TimerJobManager();
this.jobManager = new JobManager<>();
this.transientTaskManager = new TransientTaskManager();
this.taskDisruptor = new TaskDisruptor(this.timerJobManager, this.transientTaskManager);
this.timerJobManager.setDisruptor(taskDisruptor);
this.transientTaskManager.setDisruptor(taskDisruptor);
this.persistentJobRegister = new TimerJobRegister(timerJobManager);
this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
@ -1527,8 +1516,7 @@ public class Env {
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
taskDisruptor.start();
timerJobManager.start();
jobManager.start();
// Alter
getAlterInstance().start();
// Consistency checker
@ -2008,29 +1996,17 @@ public class Env {
}
public long loadAsyncJobManager(DataInputStream in, long checksum) throws IOException {
timerJobManager.readFields(in);
jobManager.readFields(in);
LOG.info("finished replay asyncJobMgr from image");
return checksum;
}
public long saveAsyncJobManager(CountingDataOutputStream out, long checksum) throws IOException {
timerJobManager.write(out);
jobManager.write(out);
LOG.info("finished save analysisMgr to image");
return checksum;
}
public long loadJobTaskManager(DataInputStream in, long checksum) throws IOException {
jobTaskManager.readFields(in);
LOG.info("finished replay jobTaskMgr from image");
return checksum;
}
public long saveJobTaskManager(CountingDataOutputStream out, long checksum) throws IOException {
jobTaskManager.write(out);
LOG.info("finished save jobTaskMgr to image");
return checksum;
}
public long loadResources(DataInputStream in, long checksum) throws IOException {
resourceMgr = ResourceMgr.read(in);
LOG.info("finished replay resources from image");
@ -3818,26 +3794,19 @@ public class Env {
return this.syncJobManager;
}
public PersistentJobRegister getJobRegister() {
return persistentJobRegister;
}
public ExportTaskRegister getExportTaskRegister() {
return exportTaskRegister;
}
public TimerJobManager getAsyncJobManager() {
return timerJobManager;
public JobManager getJobManager() {
return jobManager;
}
public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
public JobTaskManager getJobTaskManager() {
return jobTaskManager;
}
public SmallFileMgr getSmallFileMgr() {
return this.smallFileMgr;
}

View File

@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.job.task.Task;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import org.apache.commons.collections.CollectionUtils;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Data
public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable {
@SerializedName(value = "jid")
private Long jobId;
@SerializedName(value = "jn")
private String jobName;
@SerializedName(value = "js")
private JobStatus jobStatus;
@SerializedName(value = "cdb")
private String currentDbName;
@SerializedName(value = "c")
private String comment;
@SerializedName(value = "cu")
private UserIdentity createUser;
@SerializedName(value = "jc")
private JobExecutionConfiguration jobConfig;
@SerializedName(value = "ctms")
private Long createTimeMs;
@SerializedName(value = "sql")
String executeSql;
private List<T> runningTasks = new ArrayList<>();
@Override
public void cancel() throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
return;
}
runningTasks.forEach(Task::cancel);
}
@Override
public void cancel(long taskId) throws JobException {
if (CollectionUtils.isEmpty(runningTasks)) {
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id:" + taskId)).cancel();
}
public void initTasks(List<T> tasks) {
tasks.forEach(task -> {
task.setJobId(jobId);
task.setTaskId(Env.getCurrentEnv().getNextId());
task.setCreateTimeMs(System.currentTimeMillis());
task.setStatus(TaskStatus.PENDING);
});
}
public void checkJobParams() {
if (null == jobId) {
throw new IllegalArgumentException("jobId cannot be null");
}
if (null == jobConfig) {
throw new IllegalArgumentException("jobConfig cannot be null");
}
jobConfig.checkParams(createTimeMs);
checkJobParamsInternal();
}
public void updateJobStatus(JobStatus newJobStatus) {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
}
if (jobStatus == newJobStatus) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
}
jobStatus = newJobStatus;
}
protected abstract void checkJobParamsInternal();
public static AbstractJob readFields(DataInput in) throws IOException {
// todo use RuntimeTypeAdapterFactory of Gson to do the serde
JobType jobType = JobType.valueOf(Text.readString(in));
switch (jobType) {
case INSERT:
return InsertJob.readFields(in);
case MTMV:
// return MTMVJob.readFields(in);
break;
default:
throw new IllegalArgumentException("unknown job type");
}
throw new IllegalArgumentException("unknown job type");
}
@Override
public void onTaskFail(T task) {
updateJobStatusIfEnd();
}
@Override
public void onTaskSuccess(T task) {
updateJobStatusIfEnd();
runningTasks.remove(task);
}
private void updateJobStatusIfEnd() {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;
}
switch (executeType) {
case ONE_TIME:
case INSTANT:
jobStatus = JobStatus.FINISHED;
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
break;
case RECURRING:
TimerDefinition timerDefinition = getJobConfig().getTimerDefinition();
if (null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < System.currentTimeMillis()
+ timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) {
jobStatus = JobStatus.FINISHED;
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(jobStatus);
}
break;
default:
break;
}
}
/**
* get the job's common show info, which is used to show the job information
* eg:show jobs sql
*
* @return List<String> job common show info
*/
public List<String> getCommonShowInfo() {
List<String> commonShowInfo = new ArrayList<>();
commonShowInfo.add(String.valueOf(jobId));
commonShowInfo.add(jobName);
commonShowInfo.add(createUser.getQualifiedUser());
commonShowInfo.add(jobConfig.getExecuteType().name());
commonShowInfo.add(jobConfig.convertRecurringStrategyToString());
commonShowInfo.add(jobStatus.name());
commonShowInfo.add(executeSql);
commonShowInfo.add(TimeUtils.longToTimeString(createTimeMs));
commonShowInfo.add(comment);
return commonShowInfo;
}
}

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.qe.ShowResultSetMetaData;
import java.util.List;
/**
* The Job interface represents a job in the scheduler module, which stores the information of a job.
* A job can be uniquely identified using the job identifier.
* The job name is used for identification purposes and is not necessarily unique.
* The job status is used to control the execution of the job.
*
* @param <T> The type of task associated with the job, extending AbstractTask.
*/
public interface Job<T extends AbstractTask> {
/**
* Creates a list of tasks of the specified type for this job.
*
* @param taskType The type of tasks to create.
* @return A list of tasks.
*/
List<T> createTasks(TaskType taskType);
/**
* Cancels the task with the specified taskId.
*
* @param taskId The ID of the task to cancel.
* @throws JobException If the task is not in the running state, it may have already
* finished and cannot be cancelled.
*/
void cancel(long taskId) throws JobException;
/**
* Checks if the job is ready for scheduling.
* This method is called when starting the scheduled job,
* and if the job is not ready for scheduling, the scheduler will cancel it.
*
* @return True if the job is ready for scheduling, false otherwise.
*/
boolean isReadyForScheduling();
/**
* Retrieves the metadata for the job, which is used to display job information.
*
* @return The metadata for the job.
*/
ShowResultSetMetaData getJobMetaData();
/**
* Retrieves the metadata for the tasks, which is used to display task information.
* The metadata includes fields such as taskId, taskStatus, taskType, taskStartTime, taskEndTime, and taskProgress.
*
* @return The metadata for the tasks.
*/
ShowResultSetMetaData getTaskMetaData();
/**
* Retrieves the type of the job, which is used to identify different types of jobs.
*
* @return The type of the job.
*/
JobType getJobType();
/**
* Queries the list of tasks associated with this job.
*
* @return The list of tasks.
*/
List<T> queryTasks();
/**
* Cancels all running tasks of this job.
*
* @throws JobException If cancelling a running task fails.
*/
void cancel() throws JobException;
/**
* Notifies the job when a task execution fails.
*
* @param task The failed task.
*/
void onTaskFail(T task);
/**
* Notifies the job when a task execution is successful.
*
* @param task The successful task.
*/
void onTaskSuccess(T task);
/**
* Notifies the job when a task execution is cancelled.
*
* @param task The cancelled task.
*/
void onTaskCancel(T task);
}

View File

@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.constants;
package org.apache.doris.job.base;
public enum JobExecuteType {
public enum JobType {
/**
* The job will be executed only once.
*/
@ -34,5 +35,9 @@ public enum JobType {
/**
* The job will be executed manually and need to be triggered by the user.
*/
MANUAL
MANUAL,
/**
* The job will be executed immediately.
*/
INSTANT,
}

View File

@ -0,0 +1,211 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.common.util.TimeUtils;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
public class JobExecutionConfiguration {
@Getter
@Setter
@SerializedName(value = "td")
private TimerDefinition timerDefinition;
@Getter
@Setter
@SerializedName(value = "ec")
private JobExecuteType executeType;
/**
* Maximum number of concurrent tasks, <= 0 means no limit
* if the number of tasks exceeds the limit, the task will be delayed execution
* todo: implement this later, we need to consider concurrency strategies
*/
@SerializedName(value = "maxConcurrentTaskNum")
private Integer maxConcurrentTaskNum;
public void checkParams(Long createTimeMs) {
if (executeType == null) {
throw new IllegalArgumentException("executeType cannot be null");
}
if (executeType == JobExecuteType.INSTANT || executeType == JobExecuteType.MANUAL) {
return;
}
checkTimerDefinition(createTimeMs);
if (executeType == JobExecuteType.ONE_TIME) {
validateStartTimeMs();
return;
}
if (executeType == JobExecuteType.STREAMING) {
validateStartTimeMs();
return;
}
if (executeType == JobExecuteType.RECURRING) {
if (timerDefinition.getInterval() == null) {
throw new IllegalArgumentException("interval cannot be null when executeType is RECURRING");
}
if (timerDefinition.getIntervalUnit() == null) {
throw new IllegalArgumentException("intervalUnit cannot be null when executeType is RECURRING");
}
}
}
private void checkTimerDefinition(long createTimeMs) {
if (timerDefinition == null) {
throw new IllegalArgumentException(
"timerDefinition cannot be null when executeType is not instant or manual");
}
timerDefinition.checkParams(createTimeMs);
}
private void validateStartTimeMs() {
if (timerDefinition.getStartTimeMs() == null) {
throw new IllegalArgumentException("startTimeMs cannot be null");
}
if (timerDefinition.getStartTimeMs() < System.currentTimeMillis()) {
throw new IllegalArgumentException("startTimeMs cannot be less than current time");
}
}
// Returns a list of delay times in seconds for triggering the job
public List<Long> getTriggerDelayTimes(Long currentTimeMs, Long startTimeMs, Long endTimeMs) {
List<Long> delayTimeSeconds = new ArrayList<>();
if (JobExecuteType.ONE_TIME.equals(executeType)) {
// If the job is already executed or in the schedule queue, or not within this schedule window
if (null != timerDefinition.getLatestSchedulerTimeMs() || endTimeMs < timerDefinition.getStartTimeMs()) {
return delayTimeSeconds;
}
delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs()));
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
return delayTimeSeconds;
}
if (JobExecuteType.STREAMING.equals(executeType) && null != timerDefinition) {
if (null == timerDefinition.getStartTimeMs() || null != timerDefinition.getLatestSchedulerTimeMs()) {
return delayTimeSeconds;
}
// If the job is already executed or in the schedule queue, or not within this schedule window
if (endTimeMs < timerDefinition.getStartTimeMs()) {
return delayTimeSeconds;
}
delayTimeSeconds.add(queryDelayTimeSecond(currentTimeMs, timerDefinition.getStartTimeMs()));
this.timerDefinition.setLatestSchedulerTimeMs(timerDefinition.getStartTimeMs());
return delayTimeSeconds;
}
if (JobExecuteType.RECURRING.equals(executeType)) {
if (timerDefinition.getStartTimeMs() > endTimeMs || null != timerDefinition.getEndTimeMs()
&& timerDefinition.getEndTimeMs() < startTimeMs) {
return delayTimeSeconds;
}
return getExecutionDelaySeconds(startTimeMs, endTimeMs, timerDefinition.getStartTimeMs(),
timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval()), currentTimeMs);
}
return delayTimeSeconds;
}
// Returns the delay time in seconds between the current time and the specified start time
private Long queryDelayTimeSecond(Long currentTimeMs, Long startTimeMs) {
if (startTimeMs <= currentTimeMs) {
return 0L;
}
return (startTimeMs - currentTimeMs) / 1000;
}
// Returns a list of delay times in seconds for executing the job within the specified window
private List<Long> getExecutionDelaySeconds(long windowStartTimeMs, long windowEndTimeMs, long startTimeMs,
long intervalMs, long currentTimeMs) {
List<Long> timestamps = new ArrayList<>();
long windowDuration = windowEndTimeMs - windowStartTimeMs;
if (windowDuration <= 0 || intervalMs <= 0) {
return timestamps; // Return an empty list if there won't be any trigger time
}
long firstTriggerTime = windowStartTimeMs + (intervalMs - ((windowStartTimeMs - startTimeMs)
% intervalMs)) % intervalMs;
if (firstTriggerTime < currentTimeMs) {
firstTriggerTime += intervalMs;
}
if (firstTriggerTime > windowEndTimeMs) {
return timestamps; // Return an empty list if there won't be any trigger time
}
// Calculate the trigger time list
for (long triggerTime = firstTriggerTime; triggerTime <= windowEndTimeMs; triggerTime += intervalMs) {
if (triggerTime >= currentTimeMs && (null == timerDefinition.getEndTimeMs()
|| triggerTime < timerDefinition.getEndTimeMs())) {
timestamps.add(queryDelayTimeSecond(currentTimeMs, triggerTime));
}
}
return timestamps;
}
public String convertRecurringStrategyToString() {
switch (executeType) {
case ONE_TIME:
return "AT " + TimeUtils.longToTimeString(timerDefinition.getStartTimeMs());
case RECURRING:
String result = "EVERY " + timerDefinition.getInterval() + " "
+ timerDefinition.getIntervalUnit().name() + " STARTS "
+ TimeUtils.longToTimeString(timerDefinition.getStartTimeMs());
if (null != timerDefinition.getEndTimeMs()) {
result += " ENDS " + TimeUtils.longToTimeString(timerDefinition.getEndTimeMs());
}
return result;
/* case STREAMING:
return "STREAMING" + (startTimeMs > 0 ? " AT " + TimeUtils.longToTimeString(startTimeMs) : "");*/
case MANUAL:
return "MANUAL TRIGGER";
case INSTANT:
return "INSTANT";
default:
return "UNKNOWN";
}
}
public boolean checkIsTimerJob() {
return null != timerDefinition;
}
}

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.base;
import org.apache.doris.job.common.IntervalUnit;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
@Data
public class TimerDefinition {
@SerializedName(value = "il")
private Long interval;
@SerializedName(value = "iu")
private IntervalUnit intervalUnit;
@SerializedName(value = "stm")
private Long startTimeMs;
@SerializedName(value = "etm")
private Long endTimeMs;
private Long latestSchedulerTimeMs;
public void checkParams(Long createTimeMs) {
if (null != startTimeMs && startTimeMs < System.currentTimeMillis()) {
throw new IllegalArgumentException("startTimeMs must be greater than current time");
}
if (null == startTimeMs) {
startTimeMs = createTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("end time cannot be less than start time");
}
if (null != intervalUnit) {
if (null == interval) {
throw new IllegalArgumentException("interval cannot be null when intervalUnit is not null");
}
if (interval <= 0) {
throw new IllegalArgumentException("interval must be greater than 0");
}
}
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.common;
package org.apache.doris.job.common;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@ -60,7 +60,7 @@ public enum IntervalUnit {
.orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name));
}
public Long getParameterValue(Long param) {
return (Long) (param != null ? converter.apply(param) : defaultValue);
public Long getIntervalMs(Long interval) {
return (Long) (interval != null ? converter.apply(interval) : defaultValue);
}
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.constants;
package org.apache.doris.job.common;
public enum JobStatus {
@ -42,3 +42,4 @@ public enum JobStatus {
*/
FINISHED
}

View File

@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.common;
public enum JobType {
INSERT,
MTMV
}

View File

@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.common;
public enum TaskStatus {
PENDING,
CANCEL,
RUNNING,
SUCCESS,
FAILD;
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.common;
public enum TaskType {
SCHEDULED,
MANUAL;
}

View File

@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.disruptor;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.EventFactory;
import lombok.Data;
@Data
public class ExecuteTaskEvent<T extends AbstractTask> {
private T task;
private JobExecutionConfiguration jobConfig;
public static <T extends AbstractTask> EventFactory<ExecuteTaskEvent<T>> factory() {
return ExecuteTaskEvent::new;
}
}

View File

@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.disruptor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
/**
* Utility class for creating and managing a Disruptor instance.
*
* @param <T> the type of the event handled by the Disruptor
*/
public class TaskDisruptor<T> {
private final Disruptor<T> disruptor;
private final EventTranslatorVararg<T> eventTranslator;
/**
* Constructs a DisruptorUtil instance.
*
* @param eventFactory the factory for creating events
* @param ringBufferSize the size of the ring buffer
* @param threadFactory the thread factory to create threads for event handling
* @param waitStrategy the wait strategy for the ring buffer
* @param workHandlers the work handlers for processing events
* @param eventTranslator the translator for publishing events with variable arguments
*/
public TaskDisruptor(EventFactory<T> eventFactory, int ringBufferSize, ThreadFactory threadFactory,
WaitStrategy waitStrategy, WorkHandler<T>[] workHandlers,
EventTranslatorVararg<T> eventTranslator) {
disruptor = new Disruptor<>(eventFactory, ringBufferSize, threadFactory,
ProducerType.SINGLE, waitStrategy);
disruptor.handleEventsWithWorkerPool(workHandlers);
this.eventTranslator = eventTranslator;
disruptor.start();
}
/**
* Starts the Disruptor.
*/
public void start() {
disruptor.start();
}
/**
* Publishes an event with the provided arguments.
*
* @param args the arguments for the event
*/
public void publishEvent(Object... args) {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent(eventTranslator, args);
}
/**
* Shuts down the Disruptor.
*/
public void shutdown() {
disruptor.shutdown();
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.disruptor;
import org.apache.doris.job.base.AbstractJob;
import com.lmax.disruptor.EventFactory;
import lombok.Data;
@Data
public class TimerJobEvent<T extends AbstractJob<?>> {
private T job;
public static <T extends AbstractJob<?>> EventFactory<TimerJobEvent<T>> factory() {
return TimerJobEvent::new;
}
}

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.exception;
/**
* This class represents a job exception that can be thrown when a job is executed.
*/
public class JobException extends Exception {
public JobException(String message) {
super(message);
}
public JobException(String format, Object... msg) {
super(String.format(format, msg));
}
public JobException(String message, Throwable cause) {
super(message, cause);
}
public JobException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.executor;
import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
/**
* DefaultTaskExecutor is an implementation of the TaskExecutor interface.
* if you need to implement your own TaskExecutor, you could refer to this class. and need to register
* it in the TaskExecutorFactory
* It executes a given AbstractTask by acquiring a semaphore token from the TaskTokenManager
* and releasing it after the task execution.
*/
@Slf4j
public class DefaultTaskExecutorHandler<T extends AbstractTask> implements WorkHandler<ExecuteTaskEvent<T>> {
@Override
public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
T task = executeTaskEvent.getTask();
if (null == task) {
log.warn("task is null, ignore,maybe task has been canceled");
return;
}
if (task.isCancelled()) {
log.info("task is canceled, ignore");
return;
}
try {
task.runTask();
} catch (Exception e) {
//if task.onFail() throw exception, we will catch it here
log.warn("task before error, task id is {}", task.getTaskId(), e);
}
//todo we need discuss whether we need to use semaphore to control the concurrent task num
/* Semaphore semaphore = null;
// get token
try {
int maxConcurrentTaskNum = executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
semaphore = TaskTokenManager.tryAcquire(task.getJobId(), maxConcurrentTaskNum);
task.runTask();
} catch (Exception e) {
task.onFail();
log.error("execute task error, task id is {}", task.getTaskId(), e);
} finally {
if (null != semaphore) {
semaphore.release();
}*/
}
}

View File

@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
import jline.internal.Log;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
/**
* dispatch timer job to task disruptor
* when job is ready for scheduling and job status is running
* we will create task and publish to task disruptor @see DefaultTaskExecutorHandler
*/
@Slf4j
public class DispatchTaskHandler<T extends AbstractJob<?>> implements WorkHandler<TimerJobEvent<T>> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap;
public DispatchTaskHandler(Map<JobType, TaskDisruptor<T>> disruptorMap) {
this.disruptorMap = disruptorMap;
}
@Override
public void onEvent(TimerJobEvent<T> event) {
try {
if (null == event.getJob()) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULED);
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig());
}
}
} catch (Exception e) {
Log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e);
}
}
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.disruptor.TaskDisruptor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import jline.internal.Log;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements TimerTask {
private TaskDisruptor dispatchDisruptor;
private final T job;
public TimerJobSchedulerTask(TaskDisruptor dispatchDisruptor, T job) {
this.dispatchDisruptor = dispatchDisruptor;
this.job = job;
}
@Override
public void run(Timeout timeout) {
try {
dispatchDisruptor.publishEvent(this.job);
} catch (Exception e) {
Log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
}
}
}

View File

@ -15,21 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
package org.apache.doris.job.extensions.insert;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class ExecutorResult<T> {
private T result;
private boolean success;
private String errorMsg;
private String executorSql;
public class InsertIntoState {
private String trackingUrl;
}

View File

@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Data
public class InsertJob extends AbstractJob<InsertTask> {
@SerializedName(value = "labelPrefix")
String labelPrefix;
@Override
public List<InsertTask> createTasks(TaskType taskType) {
InsertTask task = new InsertTask(null, null, null, null, null);
task.setJobId(getJobId());
task.setTaskType(taskType);
task.setTaskId(Env.getCurrentEnv().getNextId());
ArrayList<InsertTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
getRunningTasks().addAll(tasks);
return tasks;
}
@Override
public void cancel(long taskId) throws JobException {
super.cancel();
}
@Override
public void cancel() throws JobException {
super.cancel();
}
@Override
public boolean isReadyForScheduling() {
return true;
}
@Override
protected void checkJobParamsInternal() {
}
public static InsertJob readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), InsertJob.class);
}
@Override
public List<InsertTask> queryTasks() {
return null;
}
@Override
public JobType getJobType() {
return JobType.INSERT;
}
@Override
public ShowResultSetMetaData getJobMetaData() {
return null;
}
@Override
public ShowResultSetMetaData getTaskMetaData() {
return null;
}
@Override
public void onTaskFail(InsertTask task) {
getRunningTasks().remove(task);
}
@Override
public void onTaskSuccess(InsertTask task) {
getRunningTasks().remove(task);
}
@Override
public void onTaskCancel(InsertTask task) {
getRunningTasks().remove(task);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, JobType.INSERT.name());
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
}

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.insert;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import lombok.extern.slf4j.Slf4j;
/**
* todo implement this later
*/
@Slf4j
public class InsertTask extends AbstractTask {
private String labelName;
private InsertIntoTableCommand command;
private LoadJob.LoadStatistic statistic;
private FailMsg failMsg;
private InsertIntoState insertIntoState;
@Override
public void before() {
super.before();
}
public InsertTask(String labelName, InsertIntoTableCommand command, LoadJob.LoadStatistic statistic,
FailMsg failMsg, InsertIntoState insertIntoState) {
this.labelName = labelName;
this.command = command;
this.statistic = statistic;
this.failMsg = failMsg;
this.insertIntoState = insertIntoState;
}
@Override
public void run() {
//just for test
log.info(getJobId() + "InsertTask run" + TimeUtils.longToTimeString(System.currentTimeMillis()));
}
@Override
public void onFail() {
super.onFail();
}
@Override
public void onSuccess() {
super.onSuccess();
}
@Override
public void cancel() {
super.cancel();
}
}

View File

@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.manager;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
public class JobManager<T extends AbstractJob<?>> implements Writable {
private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
private JobScheduler jobScheduler;
public void start() {
jobScheduler = new JobScheduler(jobMap);
jobScheduler.start();
}
public void registerJob(T job) throws JobException {
job.checkJobParams();
checkJobNameExist(job.getJobName(), job.getJobType(), job.getCurrentDbName());
if (jobMap.get(job.getJobId()) != null) {
throw new JobException("job id exist,jobId:" + job.getJobId());
}
//Env.getCurrentEnv().getEditLog().logCreateJob(job);
//check name exist
jobMap.put(job.getJobId(), job);
//check its need to scheduler
jobScheduler.scheduleOneJob(job);
}
private void checkJobNameExist(String jobName, JobType type, String currentDbName) throws JobException {
if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type)
&& (null == a.getCurrentDbName() || a.getCurrentDbName().equals(currentDbName)))) {
throw new JobException("job name exist,jobName:" + jobName);
}
}
public void unregisterJob(Long jobId) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).setJobStatus(JobStatus.STOPPED);
jobMap.get(jobId).cancel();
//Env.getCurrentEnv().getEditLog().logDeleteJob(jobMap.get(jobId));
jobMap.remove(jobId);
}
public void unregisterJob(String currentDbName, String jobName) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName()
&& a.getCurrentDbName().equals(currentDbName)) && a.getJobType().equals(JobType.INSERT)) {
try {
unregisterJob(a.getJobId());
} catch (JobException e) {
throw new JobException("unregister job error,jobName:" + jobName);
}
}
}
}
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
checkJobExist(jobId);
jobMap.get(jobId).updateJobStatus(status);
//Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
}
public void alterJobStatus(String currentDbName, String jobName, JobStatus jobStatus) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName) && (null != a.getCurrentDbName()
&& a.getCurrentDbName().equals(currentDbName)) && JobType.INSERT.equals(a.getJobType())) {
try {
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
throw new JobException("unregister job error,jobName:" + jobName);
}
}
}
}
private void checkJobExist(Long jobId) throws JobException {
if (null == jobMap.get(jobId)) {
throw new JobException("job not exist,jobId:" + jobId);
}
}
public List<T> queryJobs(JobType type) {
return jobMap.values().stream().filter(a -> a.getJobType().equals(type))
.collect(java.util.stream.Collectors.toList());
}
public List<T> queryJobs(String currentDb, String jobName) {
//only query insert job,we just provide insert job
return jobMap.values().stream().filter(a -> checkItsMatch(currentDb, jobName, a))
.collect(Collectors.toList());
}
private boolean checkItsMatch(String currentDb, String jobName, T job) {
if (StringUtils.isBlank(jobName)) {
return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName()
&& job.getCurrentDbName().equals(currentDb));
}
return job.getJobType().equals(JobType.INSERT) && (null != job.getCurrentDbName()
&& job.getCurrentDbName().equals(currentDb)) && job.getJobName().equals(jobName);
}
public List<? extends AbstractTask> queryTasks(Long jobId) throws JobException {
checkJobExist(jobId);
return jobMap.get(jobId).queryTasks();
}
public void triggerJob(long jobId) throws JobException {
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL);
}
public void replayCreateJob(T job) {
if (jobMap.containsKey(job.getJobId())) {
return;
}
jobMap.putIfAbsent(job.getJobId(), job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay create scheduler job").build());
}
/**
* Replay update load job.
**/
public void replayUpdateJob(T job) {
jobMap.put(job.getJobId(), job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay update scheduler job").build());
}
public void replayDeleteJob(T job) {
if (null == jobMap.get(job.getJobId())) {
return;
}
jobMap.remove(job.getJobId());
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay delete scheduler job").build());
}
void cancelTask(Long jobId, Long taskId) throws JobException {
checkJobExist(jobId);
if (null == jobMap.get(jobId).getRunningTasks()) {
throw new JobException("task not exist,taskId:" + taskId);
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobMap.size());
jobMap.forEach((jobId, job) -> {
try {
job.write(out);
} catch (IOException e) {
log.error("write job error,jobId:" + jobId, e);
}
});
}
/**
* read job from data input, and init job
*
* @param in data input
* @throws IOException io exception when read data input error
*/
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
AbstractJob job = AbstractJob.readFields(in);
jobMap.putIfAbsent(job.getJobId(), (T) job);
}
}
public T getJob(Long jobId) {
return jobMap.get(jobId);
}
}

View File

@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.manager;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.executor.DefaultTaskExecutorHandler;
import org.apache.doris.job.executor.DispatchTaskHandler;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.WorkHandler;
import lombok.Getter;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
public class TaskDisruptorGroupManager<T extends AbstractTask> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap = new EnumMap<>(JobType.class);
@Getter
private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor;
private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
private static final int DEFAULT_CONSUMER_THREAD_NUM = 5;
private static final int DISPATCH_TIMER_JOB_QUEUE_SIZE = Config.job_dispatch_timer_job_queue_size > 0
? Config.job_dispatch_timer_job_queue_size : DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM = Config.job_dispatch_timer_job_thread_num > 0
? Config.job_dispatch_timer_job_thread_num : DEFAULT_CONSUMER_THREAD_NUM;
private static final int DISPATCH_INSERT_THREAD_NUM = Config.job_insert_task_consumer_thread_num > 0
? Config.job_insert_task_consumer_thread_num : DEFAULT_RING_BUFFER_SIZE;
private static final int DISPATCH_INSERT_TASK_QUEUE_SIZE = DEFAULT_RING_BUFFER_SIZE;
public void init() {
registerInsertDisruptor();
//when all task queue is ready, dispatch task to registered task executor
registerDispatchDisruptor();
}
private void registerDispatchDisruptor() {
EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory = TimerJobEvent.factory();
ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task");
WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM];
for (int i = 0; i < DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM; i++) {
dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap);
}
EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator =
(event, sequence, args) -> event.setJob((AbstractJob<T>) args[0]);
this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE,
dispatchThreadFactory,
new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator);
}
private void registerInsertDisruptor() {
EventFactory<ExecuteTaskEvent<InsertTask>> insertEventFactory = ExecuteTaskEvent.factory();
ThreadFactory insertTaskThreadFactory = new CustomThreadFactory("insert-task-execute");
WorkHandler[] insertTaskExecutorHandlers = new WorkHandler[DISPATCH_INSERT_THREAD_NUM];
for (int i = 0; i < DISPATCH_INSERT_THREAD_NUM; i++) {
insertTaskExecutorHandlers[i] = new DefaultTaskExecutorHandler<InsertTask>();
}
EventTranslatorVararg<ExecuteTaskEvent<InsertTask>> eventTranslator =
(event, sequence, args) -> {
event.setTask((InsertTask) args[0]);
event.setJobConfig((JobExecutionConfiguration) args[1]);
};
TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE,
insertTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator);
disruptorMap.put(JobType.INSERT, insertDisruptor);
}
public void dispatchTimerJob(AbstractJob<T> job) {
dispatchDisruptor.publishEvent(job);
}
public void dispatchInstantTask(AbstractTask task, JobType jobType,
JobExecutionConfiguration jobExecutionConfiguration) {
disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.manager;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/**
* TaskTokenManager is responsible for managing semaphore tokens for different jobs.
* It provides a method to acquire a semaphore token for a specific job ID with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map.
*/
@Slf4j
@UtilityClass
public class TaskTokenManager {
private static final Map<Long, Semaphore> taskTokenMap = new ConcurrentHashMap<>(16);
/**
* Tries to acquire a semaphore token for the specified job ID with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map.
*
* @param jobId the ID of the job
* @param maxConcurrent the maximum concurrency for the job
* @return the acquired semaphore
*/
public static Semaphore tryAcquire(long jobId, long maxConcurrent) {
Semaphore semaphore = taskTokenMap.computeIfAbsent(jobId, id -> new Semaphore((int) maxConcurrent));
try {
semaphore.acquire();
} catch (InterruptedException e) {
log.warn("Interrupted while acquiring semaphore for job id: {} ", jobId, e);
Thread.currentThread().interrupt();
}
return semaphore;
}
}

View File

@ -0,0 +1,175 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.scheduler;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.executor.TimerJobSchedulerTask;
import org.apache.doris.job.manager.TaskDisruptorGroupManager;
import org.apache.doris.job.task.AbstractTask;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
/**
* scheduler tasks, it's used to scheduler job
*/
private HashedWheelTimer timerTaskScheduler;
private TaskDisruptor timerJobDisruptor;
private TaskDisruptorGroupManager taskDisruptorGroupManager;
private long latestBatchSchedulerTimerTaskTimeMs = 0L;
private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 60;
private static final int HASHED_WHEEL_TIMER_TICKS_PER_WHEEL = 660;
private final Map<Long, T> jobMap;
public JobScheduler(Map<Long, T> jobMap) {
this.jobMap = jobMap;
}
/**
* batch scheduler interval ms time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
public void start() {
timerTaskScheduler = new HashedWheelTimer(new CustomThreadFactory("timer-task-scheduler"), 1,
TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL);
timerTaskScheduler.start();
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor = taskDisruptorGroupManager.getDispatchDisruptor();
latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
/**
* We will cycle system scheduler tasks every 10 minutes.
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler timer tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
private void batchSchedulerTimerJob() {
executeTimerJobIdsWithinLastTenMinutesWindow();
}
public void scheduleOneJob(T job) throws JobException {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
return;
}
if (!job.getJobConfig().checkIsTimerJob()) {
//manual job will not scheduler
if (JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) {
return;
}
//todo skip streaming job,improve in the future
if (JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) {
schedulerInstantJob(job, TaskType.SCHEDULED);
}
}
//if it's timer job and trigger last window already start, we will scheduler it immediately
cycleTimerJobScheduler(job);
}
@Override
public void close() throws IOException {
//todo implement this later
}
private void cycleTimerJobScheduler(T job) {
List<Long> delaySeconds = job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
System.currentTimeMillis(), latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isNotEmpty(delaySeconds)) {
delaySeconds.forEach(delaySecond -> {
TimerJobSchedulerTask<T> timerJobSchedulerTask = new TimerJobSchedulerTask<>(timerJobDisruptor, job);
timerTaskScheduler.newTimeout(timerJobSchedulerTask, delaySecond, TimeUnit.SECONDS);
});
}
}
public void schedulerInstantJob(T job, TaskType taskType) throws JobException {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
throw new JobException("job is not running,job id is %d", job.getJobId());
}
if (!job.isReadyForScheduling()) {
log.info("job is not ready for scheduling,job id is {}", job.getJobId());
return;
}
List<? extends AbstractTask> tasks = job.createTasks(taskType);
if (CollectionUtils.isEmpty(tasks)) {
if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
}
return;
}
tasks.forEach(task -> taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig()));
}
/**
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeTimerJobIdsWithinLastTenMinutesWindow() {
if (jobMap.isEmpty()) {
return;
}
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
if (!job.getJobConfig().checkIsTimerJob()) {
continue;
}
cycleTimerJobScheduler(job);
}
}
}

View File

@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4j
public abstract class AbstractTask implements Task {
private Long jobId;
private Long taskId;
private TaskStatus status;
private Long createTimeMs;
private Long startTimeMs;
private Long finishTimeMs;
private TaskType taskType;
@Override
public void onFail(String msg) {
if (!isCallable()) {
return;
}
Env.getCurrentEnv().getJobManager().getJob(jobId).onTaskFail(this);
status = TaskStatus.FAILD;
}
@Override
public void onFail() {
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
return;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
job.onTaskFail(this);
}
private boolean isCallable() {
if (status.equals(TaskStatus.CANCEL)) {
return false;
}
if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
return true;
}
return false;
}
@Override
public void onSuccess() {
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
return;
}
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (null == job) {
log.info("job is null, job id is {}", jobId);
return;
}
job.onTaskSuccess(this);
}
@Override
public void cancel() {
status = TaskStatus.CANCEL;
}
@Override
public void before() {
status = TaskStatus.RUNNING;
setStartTimeMs(System.currentTimeMillis());
}
public void runTask() {
try {
before();
run();
onSuccess();
} catch (Exception e) {
onFail();
log.warn("execute task error, job id is {},task id is {}", jobId, taskId, e);
}
}
public boolean isCancelled() {
return status.equals(TaskStatus.CANCEL);
}
}

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.task;
/**
* The Task interface represents a task that can be executed and managed by a scheduler.
* All extension tasks must implement this interface.
* The methods defined in this interface are automatically called by the scheduler before and after the execution
* of the run method.
*/
public interface Task {
/**
* This method is called before the task is executed.
* Implementations can use this method to perform any necessary setup or initialization.
*/
void before();
/**
* This method contains the main logic of the task.
* Implementations should define the specific actions to be performed by the task.
*/
void run();
/**
* This method is called when the task fails to execute successfully.
* Implementations can use this method to handle any failure scenarios.
*/
void onFail();
/**
* This method is called when the task fails to execute successfully, with an additional error message.
* Implementations can use this method to handle any failure scenarios and provide a custom error message.
*
* @param msg The error message associated with the failure.
*/
void onFail(String msg);
/**
* This method is called when the task executes successfully.
* Implementations can use this method to handle successful execution scenarios.
*/
void onSuccess();
/**
* This method is called to cancel the execution of the task.
* Implementations should define the necessary steps to cancel the task.
*/
void cancel();
}

View File

@ -44,6 +44,7 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.InitTableLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
@ -117,8 +118,6 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Backend;
@ -538,16 +537,14 @@ public class JournalEntity implements Writable {
case OperationType.OP_UPDATE_SCHEDULER_JOB:
case OperationType.OP_DELETE_SCHEDULER_JOB:
case OperationType.OP_CREATE_SCHEDULER_JOB: {
Job job = Job.readFields(in);
AbstractJob job = AbstractJob.readFields(in);
data = job;
isRead = true;
break;
}
case OperationType.OP_CREATE_SCHEDULER_TASK:
case OperationType.OP_DELETE_SCHEDULER_TASK: {
JobTask task = JobTask.readFields(in);
data = task;
isRead = true;
//todo improve
break;
}
case OperationType.OP_CREATE_LOAD_JOB: {

View File

@ -18,8 +18,8 @@
package org.apache.doris.mtmv;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.scheduler.common.IntervalUnit;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;

View File

@ -28,6 +28,7 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mtmv.MTMVRefreshEnum.BuildMode;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
@ -375,7 +376,6 @@ import org.apache.doris.policy.FilterType;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.scheduler.common.IntervalUnit;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

View File

@ -52,6 +52,7 @@ import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.journal.Journal;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
@ -78,8 +79,6 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.TableStatsMeta;
@ -662,21 +661,21 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayCreateJob(job);
AbstractJob job = (AbstractJob) journal.getData();
Env.getCurrentEnv().getJobManager().replayCreateJob(job);
break;
}
case OperationType.OP_UPDATE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayUpdateJob(job);
AbstractJob job = (AbstractJob) journal.getData();
Env.getCurrentEnv().getJobManager().replayUpdateJob(job);
break;
}
case OperationType.OP_DELETE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayDeleteJob(job);
AbstractJob job = (AbstractJob) journal.getData();
Env.getCurrentEnv().getJobManager().replayDeleteJob(job);
break;
}
case OperationType.OP_CREATE_SCHEDULER_TASK: {
/*case OperationType.OP_CREATE_SCHEDULER_TASK: {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
break;
@ -685,7 +684,7 @@ public class EditLog {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayDeleteTask(task);
break;
}
}*/
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: {
RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData();
Env.getCurrentEnv().getRoutineLoadManager().replayChangeRoutineLoadJob(operation);
@ -1603,23 +1602,15 @@ public class EditLog {
logEdit(OperationType.OP_CREATE_ROUTINE_LOAD_JOB, routineLoadJob);
}
public void logCreateJob(Job job) {
public void logCreateJob(AbstractJob job) {
logEdit(OperationType.OP_CREATE_SCHEDULER_JOB, job);
}
public void logUpdateJob(Job job) {
public void logUpdateJob(AbstractJob job) {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
public void logCreateJobTask(JobTask jobTask) {
logEdit(OperationType.OP_CREATE_SCHEDULER_TASK, jobTask);
}
public void logDeleteJobTask(JobTask jobTask) {
logEdit(OperationType.OP_DELETE_SCHEDULER_TASK, jobTask);
}
public void logDeleteJob(Job job) {
public void logDeleteJob(AbstractJob job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}

View File

@ -77,6 +77,8 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
@ -86,8 +88,6 @@ import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.system.BackendHbResponse;
import org.apache.doris.system.BrokerHbResponse;
import org.apache.doris.system.FrontendHbResponse;
@ -221,10 +221,10 @@ public class GsonUtils {
RuntimeTypeAdapterFactory.of(
AbstractDataSourceProperties.class, "clazz")
.registerSubtype(KafkaDataSourceProperties.class, KafkaDataSourceProperties.class.getSimpleName());
private static RuntimeTypeAdapterFactory<JobExecutor> jobExecutorRuntimeTypeAdapterFactory =
private static RuntimeTypeAdapterFactory<AbstractJob> jobExecutorRuntimeTypeAdapterFactory =
RuntimeTypeAdapterFactory.of(
JobExecutor.class, "clazz")
.registerSubtype(SqlJobExecutor.class, SqlJobExecutor.class.getSimpleName());
AbstractJob.class, "clazz")
.registerSubtype(InsertJob.class, InsertJob.class.getSimpleName());
private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")

View File

@ -232,10 +232,6 @@ public class MetaPersistMethod {
Env.class.getDeclaredMethod("saveAsyncJobManager", CountingDataOutputStream.class, long.class);
break;
case "JobTaskManager":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadJobTaskManager", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveJobTaskManager", CountingDataOutputStream.class, long.class);
break;
default:
break;

View File

@ -39,7 +39,7 @@ public class PersistMetaModules {
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager", "JobTaskManager");
"binlogs", "resourceGroups", "AnalysisMgrV2", "AsyncJobManager");
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)
public static final ImmutableList<String> DEPRECATED_MODULE_NAMES = ImmutableList.of(

View File

@ -121,9 +121,9 @@ import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.logging.log4j.LogManager;
@ -184,16 +184,16 @@ public class DdlExecutor {
} else if (ddlStmt instanceof AlterRoutineLoadStmt) {
env.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateJobStmt) {
env.getJobRegister().registerJob((((CreateJobStmt) ddlStmt).getJob()));
env.getJobManager().registerJob(((CreateJobStmt) ddlStmt).getJobInstance());
} else if (ddlStmt instanceof StopJobStmt) {
StopJobStmt stmt = (StopJobStmt) ddlStmt;
env.getJobRegister().stopJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
env.getJobManager().unregisterJob(stmt.getDbFullName(), stmt.getName());
} else if (ddlStmt instanceof PauseJobStmt) {
PauseJobStmt stmt = (PauseJobStmt) ddlStmt;
env.getJobRegister().pauseJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
env.getJobManager().alterJobStatus(stmt.getDbFullName(), stmt.getName(), JobStatus.PAUSED);
} else if (ddlStmt instanceof ResumeJobStmt) {
ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
env.getJobRegister().resumeJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
env.getJobManager().alterJobStatus(stmt.getDbFullName(), stmt.getName(), JobStatus.RUNNING);
} else if (ddlStmt instanceof CreateUserStmt) {
CreateUserStmt stmt = (CreateUserStmt) ddlStmt;
env.getAuth().createUser(stmt);

View File

@ -189,8 +189,6 @@ import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
@ -429,7 +427,7 @@ public class ShowExecutor {
} else if (stmt instanceof ShowJobStmt) {
handleShowJob();
} else if (stmt instanceof ShowJobTaskStmt) {
handleShowJobTask();
//handleShowJobTask();
} else if (stmt instanceof ShowConvertLSCStmt) {
handleShowConvertLSC();
} else {
@ -1418,7 +1416,7 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows);
}
private void handleShowJobTask() {
/*private void handleShowJobTask() {
ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
List<Job> jobs = Env.getCurrentEnv().getJobRegister()
@ -1439,21 +1437,15 @@ public class ShowExecutor {
rows.add(jobTask.getShowInfo(job.getJobName()));
}
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
}
}*/
private void handleShowJob() throws AnalysisException {
ShowJobStmt showJobStmt = (ShowJobStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
// if job exists
List<Job> jobList;
PatternMatcher matcher = null;
if (showJobStmt.getPattern() != null) {
matcher = PatternMatcherWrapper.createMysqlPattern(showJobStmt.getPattern(),
CaseSensibility.JOB.getCaseSensibility());
}
jobList = Env.getCurrentEnv().getJobRegister()
.getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), showJobStmt.getJobCategory(),
matcher);
List<org.apache.doris.job.base.AbstractJob> jobList;
jobList = Env.getCurrentEnv().getJobManager()
.queryJobs(showJobStmt.getDbFullName(), showJobStmt.getName());
if (jobList.isEmpty()) {
resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows);
@ -1461,15 +1453,9 @@ public class ShowExecutor {
}
// check auth
for (Job job : jobList) {
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), job.getDbName(), PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR,
ConnectContext.get().getQualifiedUser(), job.getDbName());
}
}
for (Job job : jobList) {
rows.add(job.getShowInfo());
for (org.apache.doris.job.base.AbstractJob job : jobList) {
rows.add(job.getCommonShowInfo());
}
resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows);
}

View File

@ -1,58 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.constants;
import lombok.Getter;
/**
* The job category is used to distinguish different types of jobs.
*/
public enum JobCategory {
COMMON(1, "common", true),
SQL(2, "sql", true),
MTMV(3, "mtmv", false),
;
@Getter
private int code;
@Getter
private String name;
/**
* if the job is persistent, it will be saved to the metadata store.
* if the job is not persistent, it will not be saved to the memory.
*/
@Getter
private boolean persistent;
JobCategory(int code, String name, boolean persistent) {
this.code = code;
this.name = name;
this.persistent = persistent;
}
public static JobCategory getJobCategoryByName(String name) {
for (JobCategory jobCategory : JobCategory.values()) {
if (jobCategory.name.equalsIgnoreCase(name)) {
return jobCategory;
}
}
throw new IllegalArgumentException("Unknown job category name: " + name);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.doris.scheduler.disruptor;
import org.apache.doris.common.Config;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import com.lmax.disruptor.BlockingWaitStrategy;
@ -48,8 +47,6 @@ import java.util.concurrent.TimeUnit;
public class TaskDisruptor implements Closeable {
private Disruptor<TaskEvent> disruptor;
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size;
@ -77,8 +74,7 @@ public class TaskDisruptor implements Closeable {
event.setTaskType(taskType);
};
public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) {
this.timerJobManager = timerJobManager;
public TaskDisruptor(TransientTaskManager transientTaskManager) {
this.transientTaskManager = transientTaskManager;
}
@ -88,7 +84,7 @@ public class TaskDisruptor implements Closeable {
ProducerType.SINGLE, new BlockingWaitStrategy());
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
for (int i = 0; i < consumerThreadCount; i++) {
workers[i] = new TaskHandler(timerJobManager, transientTaskManager);
workers[i] = new TaskHandler(transientTaskManager);
}
disruptor.handleEventsWithWorkerPool(workers);
disruptor.start();

View File

@ -17,16 +17,8 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.manager.JobTaskManager;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import com.lmax.disruptor.WorkHandler;
@ -42,22 +34,10 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TaskHandler implements WorkHandler<TaskEvent> {
/**
* The event job manager used to retrieve and execute event jobs.
*/
private TimerJobManager timerJobManager;
private TransientTaskManager transientTaskManager;
private JobTaskManager jobTaskManager;
/**
* Constructs a new {@link TaskHandler} instance with the specified event job manager.
*
* @param timerJobManager The event job manager used to retrieve and execute event jobs.
*/
public TaskHandler(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) {
this.timerJobManager = timerJobManager;
public TaskHandler(TransientTaskManager transientTaskManager) {
this.transientTaskManager = transientTaskManager;
}
@ -71,10 +51,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
@Override
public void onEvent(TaskEvent event) {
switch (event.getTaskType()) {
case SCHEDULER_JOB_TASK:
case MANUAL_JOB_TASK:
onTimerJobTaskHandle(event);
break;
case TRANSIENT_TASK:
onTransientTaskHandle(event);
break;
@ -84,71 +60,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
}
/**
* Processes an event task by retrieving the associated event job and executing it if it is running.
*
* @param taskEvent The event task to be processed.
*/
@SuppressWarnings("checkstyle:UnusedLocalVariable")
public void onTimerJobTaskHandle(TaskEvent taskEvent) {
long jobId = taskEvent.getId();
long taskId = taskEvent.getTaskId();
JobTask jobTask = jobTaskManager.pollPrepareTaskByTaskId(jobId, taskId);
if (jobTask == null) {
log.warn("jobTask is null, maybe it's cancel, jobId: {}, taskId: {}", jobId, taskId);
return;
}
Job job = timerJobManager.getJob(jobId);
if (job == null) {
log.info("job is null, jobId: {}", jobId);
return;
}
if (!job.isRunning()) {
log.info("job is not running, eventJobId: {}", jobId);
return;
}
log.debug("job is running, eventJobId: {}", jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
ExecutorResult result = job.getExecutor().execute(job, jobTask.getContextData());
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.getJobType().equals(JobType.RECURRING)) {
updateJobStatusIfPastEndTime(job);
} else {
// one time job should be finished after execute
updateOnceTimeJobStatus(job);
}
if (null == result) {
log.warn("Job execute failed, jobId: {}, result is null", jobId);
jobTask.setErrorMsg("Job execute failed, result is null");
jobTask.setIsSuccessful(false);
timerJobManager.setJobLatestStatus(jobId, false);
return;
}
String resultStr = GsonUtils.GSON.toJson(result.getResult());
jobTask.setExecuteResult(resultStr);
jobTask.setIsSuccessful(result.isSuccess());
if (!result.isSuccess()) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, result.getExecutorSql());
jobTask.setErrorMsg(result.getErrorMsg());
}
jobTask.setExecuteSql(result.getExecutorSql());
} catch (Exception e) {
log.warn("Job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
}
jobTask.setEndTimeMs(System.currentTimeMillis());
if (null == jobTaskManager) {
jobTaskManager = Env.getCurrentEnv().getJobTaskManager();
}
boolean isPersistent = job.getJobCategory().isPersistent();
jobTaskManager.addJobTask(jobTask, isPersistent);
timerJobManager.setJobLatestStatus(jobId, jobTask.getIsSuccessful());
}
public void onTransientTaskHandle(TaskEvent taskEvent) {
Long taskId = taskEvent.getId();
TransientTaskExecutor taskExecutor = transientTaskManager.getMemoryTaskExecutor(taskId);
@ -164,18 +75,4 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
}
}
private void updateJobStatusIfPastEndTime(Job job) {
if (job.isExpired()) {
timerJobManager.finishJob(job.getJobId());
}
}
private void updateOnceTimeJobStatus(Job job) {
if (job.getJobType().equals(JobType.STREAMING)) {
timerJobManager.putOneJobToQueen(job.getJobId());
return;
}
job.finish();
}
}

View File

@ -1,54 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.thrift.TUniqueId;
import lombok.Getter;
import java.util.UUID;
@Getter
public abstract class AbstractJobExecutor<T, C> implements JobExecutor<T, C> {
protected ConnectContext createContext(Job job) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
ctx.setDatabase(job.getDbName());
ctx.setQualifiedUser(job.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
ctx.getState().reset();
ctx.setThreadLocalInfo();
return ctx;
}
protected String generateTaskId() {
return UUID.randomUUID().toString();
}
protected TUniqueId generateQueryId(String taskIdString) {
UUID taskId = UUID.fromString(taskIdString);
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
}
}

View File

@ -1,46 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
/**
* This interface represents a callback for an event registration. All event registrations
* must implement this interface to provide an execution method.
* We will persist JobExecutor in the database, and then execute it when the scheduler starts.
* We use Gson to serialize and deserialize JobExecutor. so the implementation of JobExecutor needs to be serializable.
* You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When you implement JobExecutor,pls make sure
* you can serialize and deserialize it.
*/
@FunctionalInterface
public interface JobExecutor<T, C> {
/**
* Executes the event job and returns the result.
* Exceptions will be caught internally, so there is no need to define or throw them separately.
*
* @param job The event job to execute.
* @param dataContext The data context of the event job. if you need to pass parameters to the event job,
* you can use it.
* @return The result of the event job execution.
*/
ExecutorResult<T> execute(Job job, C dataContext) throws JobException;
}

View File

@ -1,79 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.ExecutorResult;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
/**
* we use this executor to execute sql job
*/
@Getter
@Slf4j
public class SqlJobExecutor extends AbstractJobExecutor<String, Map<String, Object>> {
@Setter
@SerializedName(value = "sql")
private String sql;
public SqlJobExecutor(String sql) {
this.sql = sql;
}
@Override
public ExecutorResult<String> execute(Job job, Map<String, Object> dataContext) throws JobException {
ConnectContext ctx = createContext(job);
String taskIdString = generateTaskId();
TUniqueId queryId = generateQueryId(taskIdString);
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
String result = convertExecuteResult(ctx, taskIdString);
return new ExecutorResult<>(result, true, null, sql);
} catch (Exception e) {
log.warn("execute sql job failed, job id :{}, sql: {}, error: {}", job.getJobId(), sql, e);
return new ExecutorResult<>(null, false, e.getMessage(), sql);
}
}
private String convertExecuteResult(ConnectContext ctx, String queryId) throws JobException {
if (null == ctx.getState()) {
throw new JobException("execute sql job failed, sql: " + sql + ", error: response state is null");
}
if (null != ctx.getState().getErrorCode()) {
throw new JobException("error code: " + ctx.getState().getErrorCode() + ", error msg: "
+ ctx.getState().getErrorMessage());
}
return "queryId:" + queryId + ",affectedRows : " + ctx.getState().getAffectedRows() + ", warningRows: "
+ ctx.getState().getWarningRows() + ",infoMsg" + ctx.getState().getInfoMessage();
}
}

View File

@ -1,292 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.JobExecutor;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* Job is the core of the scheduler module, which is used to store the Job information of the job module.
* We can use the job to uniquely identify a Job.
* The jobName is used to identify the job, which is not unique.
* The jobStatus is used to identify the status of the Job, which is used to control the execution of the
* job.
*/
@Data
public class Job implements Writable {
public Job(String jobName, Long intervalMilliSeconds, Long startTimeMs, Long endTimeMs,
JobExecutor executor) {
this.jobName = jobName;
this.executor = executor;
this.intervalMs = intervalMilliSeconds;
this.startTimeMs = null == startTimeMs ? 0L : startTimeMs;
this.endTimeMs = null == endTimeMs ? 0L : endTimeMs;
this.jobStatus = JobStatus.RUNNING;
this.jobId = Env.getCurrentEnv().getNextId();
}
public Job() {
this.jobId = Env.getCurrentEnv().getNextId();
}
@SerializedName("jobId")
private Long jobId;
@SerializedName("jobName")
private String jobName;
@SerializedName("dbName")
private String dbName;
/**
* The status of the job, which is used to control the execution of the job.
*
* @see JobStatus
*/
@SerializedName("jobStatus")
private JobStatus jobStatus;
@SerializedName("jobType")
private JobType jobType = JobType.RECURRING;
/**
* The executor of the job.
*
* @see JobExecutor
*/
@SerializedName("executor")
private JobExecutor executor;
@SerializedName("baseName")
private String baseName;
@SerializedName("user")
private String user;
@SerializedName("intervalMs")
private Long intervalMs = 0L;
@SerializedName("startTimeMs")
private Long startTimeMs = 0L;
@SerializedName("endTimeMs")
private Long endTimeMs = 0L;
@SerializedName("timezone")
private String timezone;
@SerializedName("jobCategory")
private JobCategory jobCategory;
@SerializedName("latestStartExecuteTimeMs")
private Long latestStartExecuteTimeMs = 0L;
@SerializedName("latestCompleteExecuteTimeMs")
private Long latestCompleteExecuteTimeMs = 0L;
@SerializedName("intervalUnit")
private IntervalUnit intervalUnit;
@SerializedName("originInterval")
private Long originInterval;
@SerializedName("nextExecuteTimeMs")
private Long nextExecuteTimeMs = 0L;
@SerializedName("createTimeMs")
private Long createTimeMs = System.currentTimeMillis();
private Boolean lastExecuteTaskStatus;
@SerializedName("comment")
private String comment;
@SerializedName("errMsg")
private String errMsg;
/**
* if we want to start the job immediately, we can set this flag to true.
* The default value is false.
* when we set this flag to true, the start time will be set to current time.
* we don't need to serialize this field.
*/
private boolean immediatelyStart = false;
public boolean isRunning() {
return jobStatus == JobStatus.RUNNING;
}
public boolean isStopped() {
return jobStatus == JobStatus.STOPPED;
}
public boolean isFinished() {
return jobStatus == JobStatus.FINISHED;
}
public boolean isExpired(long nextExecuteTimestamp) {
if (endTimeMs == 0L) {
return false;
}
return nextExecuteTimestamp > endTimeMs;
}
public boolean isTaskTimeExceeded() {
if (endTimeMs == 0L) {
return false;
}
return System.currentTimeMillis() >= endTimeMs || nextExecuteTimeMs > endTimeMs;
}
public boolean isExpired() {
if (endTimeMs == 0L) {
return false;
}
return System.currentTimeMillis() >= endTimeMs;
}
public Long getExecuteTimestampAndGeneratorNext() {
this.latestStartExecuteTimeMs = nextExecuteTimeMs;
// todo The problem of delay should be considered. If it is greater than the ten-minute time window,
// should the task be lost or executed on a new time window?
this.nextExecuteTimeMs = latestStartExecuteTimeMs + intervalMs;
return nextExecuteTimeMs;
}
public void pause() {
this.jobStatus = JobStatus.PAUSED;
}
public void pause(String errMsg) {
this.jobStatus = JobStatus.PAUSED;
this.errMsg = errMsg;
}
public void finish() {
this.jobStatus = JobStatus.FINISHED;
}
public void resume() {
this.jobStatus = JobStatus.RUNNING;
}
public void stop() {
this.jobStatus = JobStatus.STOPPED;
}
public void checkJobParam() throws DdlException {
if (null == jobCategory) {
throw new DdlException("jobCategory must be set");
}
if (null == executor) {
throw new DdlException("Job executor must be set");
}
if (null == jobType) {
throw new DdlException("Job type must be set");
}
if (jobType.equals(JobType.MANUAL)) {
return;
}
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
throw new DdlException("startTimeMs must be greater than current time");
}
if (immediatelyStart && startTimeMs != 0L) {
throw new DdlException("immediately start and startTimeMs can't be set at the same time");
}
if (immediatelyStart) {
startTimeMs = System.currentTimeMillis();
}
if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
throw new DdlException("endTimeMs must be greater than current time");
}
if (null != intervalUnit && null != originInterval) {
this.intervalMs = intervalUnit.getParameterValue(originInterval);
}
if (jobType.equals(JobType.RECURRING) && (intervalMs == null || intervalMs <= 0L)) {
throw new DdlException("cycle job must set intervalMs");
}
}
@Override
public void write(DataOutput out) throws IOException {
String jobData = GsonUtils.GSON.toJson(this);
Text.writeString(out, jobData);
}
public static Job readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), Job.class);
}
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(jobName);
row.add(user);
row.add(jobType.name());
row.add(convertRecurringStrategyToString());
row.add(jobStatus.name());
row.add(null == lastExecuteTaskStatus ? "null" : lastExecuteTaskStatus.toString());
row.add(createTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(createTimeMs));
row.add(comment == null ? "null" : comment);
return row;
}
private String convertRecurringStrategyToString() {
if (jobType.equals(JobType.MANUAL)) {
return "MANUAL TRIGGER";
}
switch (jobType) {
case ONE_TIME:
return "AT " + TimeUtils.longToTimeString(startTimeMs);
case RECURRING:
String result = "EVERY " + originInterval + " " + intervalUnit.name();
if (startTimeMs > 0) {
result += " STARTS " + TimeUtils.longToTimeString(startTimeMs);
}
if (endTimeMs > 0) {
result += " ENDS " + TimeUtils.longToTimeString(endTimeMs);
}
return result;
case STREAMING:
return "STREAMING" + (startTimeMs > 0 ? " AT " + TimeUtils.longToTimeString(startTimeMs) : "");
case MANUAL:
return "MANUAL TRIGGER";
default:
return "UNKNOWN";
}
}
}

View File

@ -1,136 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.constants.TaskType;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@Data
@Slf4j
public class JobTask<T> implements Writable {
@SerializedName("jobId")
private Long jobId;
@SerializedName("taskId")
private Long taskId;
@SerializedName("createTimeMs")
private Long createTimeMs;
@SerializedName("startTimeMs")
private Long startTimeMs;
@SerializedName("endTimeMs")
private Long endTimeMs;
@SerializedName("successful")
private Boolean isSuccessful;
@SerializedName("executeSql")
private String executeSql;
@SerializedName("executeResult")
private String executeResult;
@SerializedName("errorMsg")
private String errorMsg;
@SerializedName("contextDataStr")
private String contextDataStr;
@SerializedName("taskType")
private TaskType taskType = TaskType.SCHEDULER_JOB_TASK;
/**
* Some parameters specific to the current task that need to be used to execute the task
* eg: sql task, sql it's: select * from table where id = 1 order by id desc limit ${limit} offset ${offset}
* contextData is a map, key1 is limit, value is 10,key2 is offset, value is 1
* when execute the task, we will replace the ${limit} to 10, ${offset} to 1
* so to execute sql is: select * from table where id = 1 order by id desc limit 10 offset 1.
*/
private T contextData;
public JobTask(Long jobId, Long taskId, Long createTimeMs) {
//it's enough to use nanoTime to identify a task
this.taskId = taskId;
this.jobId = jobId;
this.createTimeMs = createTimeMs;
}
public JobTask(Long jobId, Long taskId, Long createTimeMs, T contextData) {
this(jobId, taskId, createTimeMs);
this.contextData = contextData;
try {
this.contextDataStr = GsonUtils.GSON.toJson(contextData);
} catch (Exception e) {
this.contextDataStr = null;
log.error("contextData serialize failed, jobId: {}, taskId: {}", jobId, taskId, e);
}
}
public List<String> getShowInfo(String jobName) {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(taskId));
row.add(String.valueOf(jobId));
row.add(jobName);
if (null != createTimeMs) {
row.add(TimeUtils.longToTimeString(createTimeMs));
}
row.add(TimeUtils.longToTimeString(startTimeMs));
row.add(null == endTimeMs ? "null" : TimeUtils.longToTimeString(endTimeMs));
if (endTimeMs == null) {
row.add("RUNNING");
} else {
row.add(isSuccessful ? "SUCCESS" : "FAILED");
}
if (null == executeSql) {
row.add("null");
} else {
row.add(executeSql);
}
if (null == executeResult) {
row.add("null");
} else {
row.add(executeResult);
}
if (null == errorMsg) {
row.add("null");
} else {
row.add(errorMsg);
}
row.add(taskType.name());
return row;
}
@Override
public void write(DataOutput out) throws IOException {
String jobData = GsonUtils.GSON.toJson(this);
Text.writeString(out, jobData);
}
public static JobTask readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), JobTask.class);
}
}

View File

@ -1,57 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import lombok.Getter;
/**
* This class represents a timer task that can be scheduled by a Netty timer.
* When the timer task is triggered, it produces a Job task using the Disruptor.
* The Job task contains the ID of the Job and the ID of the task itself.
*/
@Getter
public class TimerJobTask implements TimerTask {
private final Long jobId;
// more fields should be added here and record in feature
private final Long taskId;
private final Long startTimestamp;
private final TaskDisruptor taskDisruptor;
public TimerJobTask(Long jobId, Long taskId, Long startTimestamp, TaskDisruptor taskDisruptor) {
this.jobId = jobId;
this.startTimestamp = startTimestamp;
this.taskDisruptor = taskDisruptor;
this.taskId = taskId;
}
@Override
public void run(Timeout timeout) {
if (timeout.isCancelled()) {
return;
}
taskDisruptor.tryPublish(jobId, taskId);
}
}

View File

@ -1,152 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.scheduler.job.JobTask;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class JobTaskManager implements Writable {
private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_saved_count;
private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16);
/**
* taskId -> startTime
* used to record the start time of the task to be executed
* will clear when the task is executed
*/
private static ConcurrentHashMap<Long, Map<Long, JobTask>> prepareTaskCreateMsMap = new ConcurrentHashMap<>(16);
public static void addPrepareTask(JobTask jobTask) {
long jobId = jobTask.getJobId();
long taskId = jobTask.getTaskId();
prepareTaskCreateMsMap.computeIfAbsent(jobId, k -> new HashMap<>());
prepareTaskCreateMsMap.get(jobId).put(taskId, jobTask);
}
public static JobTask pollPrepareTaskByTaskId(Long jobId, Long taskId) {
if (!prepareTaskCreateMsMap.containsKey(jobId) || !prepareTaskCreateMsMap.get(jobId).containsKey(taskId)) {
// if the job is not in the map, return new JobTask
// return new JobTask(jobId, taskId, System.currentTimeMillis()); fixme
return null;
}
return prepareTaskCreateMsMap.get(jobId).remove(taskId);
}
public static void clearPrepareTaskByJobId(Long jobId) {
prepareTaskCreateMsMap.remove(jobId);
}
public void addJobTask(JobTask jobTask, boolean persist) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>());
jobTasks.add(jobTask);
if (jobTasks.size() > TASK_MAX_NUM) {
JobTask oldTask = jobTasks.poll();
if (persist) {
Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask);
}
}
if (persist) {
Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask);
}
}
public List<JobTask> getJobTasks(Long jobId) {
if (jobTaskMap.containsKey(jobId)) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(jobId);
List<JobTask> jobTaskList = new LinkedList<>(jobTasks);
Collections.reverse(jobTaskList);
return jobTaskList;
}
return new ArrayList<>();
}
public void replayCreateTask(JobTask task) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(task.getJobId(), k -> new ConcurrentLinkedQueue<>());
jobTasks.add(task);
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId())
.add("msg", "replay create scheduler task").build());
}
public void replayDeleteTask(JobTask task) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(task.getJobId());
if (jobTasks != null) {
jobTasks.remove(task);
}
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId())
.add("msg", "replay delete scheduler task").build());
}
public void deleteJobTasks(Long jobId) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(jobId);
if (null != jobTasks) {
jobTaskMap.remove(jobId);
}
clearPrepareTaskByJobId(jobId);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobTaskMap.size());
for (Map.Entry<Long, ConcurrentLinkedQueue<JobTask>> entry : jobTaskMap.entrySet()) {
out.writeLong(entry.getKey());
out.writeInt(entry.getValue().size());
for (JobTask jobTask : entry.getValue()) {
jobTask.write(out);
}
}
}
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Long jobId = in.readLong();
int taskSize = in.readInt();
ConcurrentLinkedQueue<JobTask> jobTasks = new ConcurrentLinkedQueue<>();
for (int j = 0; j < taskSize; j++) {
JobTask jobTask = JobTask.readFields(in);
jobTasks.add(jobTask);
}
jobTaskMap.put(jobId, jobTasks);
}
}
}

View File

@ -1,573 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.disruptor.TaskDisruptor;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.job.TimerJobTask;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TimerJobManager implements Closeable, Writable {
private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128);
private long lastBatchSchedulerTimestamp;
private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
/**
* batch scheduler interval ms time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
private boolean isClosed = false;
/**
* key: jobid
* value: timeout list for one job
* it's used to cancel task, if task has started, it can't be canceled
*/
private final ConcurrentHashMap<Long, Map<Long, Timeout>> jobTimeoutMap = new ConcurrentHashMap<>(128);
/**
* scheduler tasks, it's used to scheduler job
*/
private HashedWheelTimer dorisTimer;
/**
* Producer and Consumer model
* disruptor is used to handle task
* disruptor will start a thread pool to handle task
*/
@Setter
private TaskDisruptor disruptor;
public TimerJobManager() {
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
public void start() {
dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
dorisTimer.start();
Long currentTimeMs = System.currentTimeMillis();
jobMap.forEach((jobId, job) -> {
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
});
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}
public Long registerJob(Job job) throws DdlException {
job.checkJobParam();
checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory());
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
Env.getCurrentEnv().getEditLog().logCreateJob(job);
return job.getJobId();
}
public void replayCreateJob(Job job) {
if (jobMap.containsKey(job.getJobId())) {
return;
}
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay create scheduler job").build());
}
/**
* Replay update load job.
**/
public void replayUpdateJob(Job job) {
jobMap.put(job.getJobId(), job);
if (JobStatus.RUNNING.equals(job.getJobStatus())) {
initAndSchedulerJob(job);
}
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay update scheduler job").build());
}
public void replayDeleteJob(Job job) {
if (null == jobMap.get(job.getJobId())) {
return;
}
jobMap.remove(job.getJobId());
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay delete scheduler job").build());
Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId());
}
private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory))
.filter(job -> job.getDbName().equals(dbName))
.filter(job -> job.getJobName().equals(jobName)).findFirst();
if (optionalJob.isPresent()) {
throw new DdlException("Name " + jobName + " already used in db " + dbName);
}
}
private void initAndSchedulerJob(Job job) {
if (!job.getJobStatus().equals(JobStatus.RUNNING) || job.getJobType().equals(JobType.MANUAL)) {
return;
}
Long currentTimeMs = System.currentTimeMillis();
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.getJobType());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
lastBatchSchedulerTimestamp,
job.getNextExecuteTimeMs(), job.getJobType());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
putOneTask(job.getJobId(), timestamp);
}
}
}
}
private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, JobType jobType) {
// if job not delay, first execute time is start time
if (startTimeMs != 0L && startTimeMs > currentTimeMs) {
return startTimeMs;
}
// if job already delay, first execute time is current time
if (startTimeMs != 0L && startTimeMs < currentTimeMs) {
return currentTimeMs;
}
// if it's cycle job and not set start tine, first execute time is current time + interval
if (jobType.equals(JobType.RECURRING) && startTimeMs == 0L) {
return currentTimeMs + intervalMs;
}
// if it's not cycle job and already delay, first execute time is current time
return currentTimeMs;
}
public <T> boolean immediateExecuteTask(Long jobId, T taskContextData) throws DdlException {
Job job = jobMap.get(jobId);
if (job == null) {
log.warn("immediateExecuteTask failed, jobId: {} not exist", jobId);
return false;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
log.warn("immediateExecuteTask failed, jobId: {} is not running", jobId);
return false;
}
JobTask jobTask = createInitialTask(jobId, taskContextData);
jobTask.setTaskType(TaskType.MANUAL_JOB_TASK);
JobTaskManager.addPrepareTask(jobTask);
disruptor.tryPublish(jobId, jobTask.getTaskId(), TaskType.MANUAL_JOB_TASK);
return true;
}
public void unregisterJob(Long jobId) {
jobMap.remove(jobId);
}
public void pauseJob(Long jobId) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("pauseJob failed, jobId: {} not exist", jobId);
}
if (jobMap.get(jobId).getJobStatus().equals(JobStatus.PAUSED)) {
log.warn("pauseJob failed, jobId: {} is already paused", jobId);
}
pauseJob(job);
}
public void setJobLatestStatus(long jobId, boolean status) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("pauseJob failed, jobId: {} not exist", jobId);
}
job.setLastExecuteTaskStatus(status);
}
public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (job.getJobStatus().equals(JobStatus.STOPPED)) {
throw new DdlException("Job " + jobName + " is already stopped");
}
stopJob(optionalJob.get());
Env.getCurrentEnv().getEditLog().logDeleteJob(optionalJob.get());
}
private void stopJob(Job job) {
if (JobStatus.RUNNING.equals(job.getJobStatus())) {
cancelJobAllTask(job.getJobId());
}
job.setJobStatus(JobStatus.STOPPED);
jobMap.get(job.getJobId()).stop();
Env.getCurrentEnv().getEditLog().logDeleteJob(job);
Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId());
}
public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (!job.getJobStatus().equals(JobStatus.PAUSED)) {
throw new DdlException("Job " + jobName + " is not paused");
}
resumeJob(job);
}
private void resumeJob(Job job) {
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.RUNNING);
jobMap.get(job.getJobId()).resume();
initAndSchedulerJob(job);
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
throw new DdlException("Job " + jobName + " is not running");
}
pauseJob(job);
}
private void pauseJob(Job job) {
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.PAUSED);
jobMap.get(job.getJobId()).pause();
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
public void finishJob(long jobId) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("update job status failed, jobId: {} not exist", jobId);
}
if (jobMap.get(jobId).getJobStatus().equals(JobStatus.FINISHED)) {
return;
}
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.FINISHED);
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
private Optional<Job> findJob(String dbName, String jobName, JobCategory jobCategory) {
return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst();
}
private boolean checkJobMatch(Job job, String dbName, String jobName, JobCategory jobCategory) {
return job.getDbName().equals(dbName) && job.getJobName().equals(jobName)
&& job.getJobCategory().equals(jobCategory);
}
public void resumeJob(Long jobId) {
if (jobMap.get(jobId) == null) {
log.warn("resumeJob failed, jobId: {} not exist", jobId);
return;
}
Job job = jobMap.get(jobId);
resumeJob(job);
}
public void stopJob(Long jobId) {
Job job = jobMap.get(jobId);
if (null == job) {
log.warn("stopJob failed, jobId: {} not exist", jobId);
return;
}
if (job.getJobStatus().equals(JobStatus.STOPPED)) {
log.warn("stopJob failed, jobId: {} is already stopped", jobId);
return;
}
stopJob(job);
}
public Job getJob(Long jobId) {
return jobMap.get(jobId);
}
public Map<Long, Job> getAllJob() {
return jobMap;
}
public void batchSchedulerTasks() {
executeJobIdsWithinLastTenMinutesWindow();
}
private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime, JobType jobType) {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!jobType.equals(JobType.RECURRING) && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
return jobExecuteTimes;
}
if (jobType.equals(JobType.RECURRING) && (nextExecuteTime > endTimeEndWindow)) {
return new ArrayList<>();
}
while (endTimeEndWindow >= nextExecuteTime) {
if (job.isTaskTimeExceeded()) {
break;
}
jobExecuteTimes.add(nextExecuteTime);
nextExecuteTime = job.getExecuteTimestampAndGeneratorNext();
}
return jobExecuteTimes;
}
/**
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeJobIdsWithinLastTenMinutesWindow() {
// if the task executes for more than 10 minutes, it will be delay, so,
// set lastBatchSchedulerTimestamp to current time
if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) {
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
return;
}
jobMap.forEach((k, v) -> {
if (!v.getJobType().equals(JobType.MANUAL) && v.isRunning() && (v.getNextExecuteTimeMs()
+ v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
List<Long> executeTimes = findTasksBetweenTime(
v, lastBatchSchedulerTimestamp,
v.getNextExecuteTimeMs(), v.getJobType());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
putOneTask(v.getJobId(), executeTime);
}
}
}
});
}
/**
* We will cycle system scheduler tasks every 10 minutes.
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
log.info("re-register system scheduler tasks" + TimeUtils.longToTimeString(System.currentTimeMillis()));
dorisTimer.newTimeout(timeout -> {
batchSchedulerTasks();
clearFinishJob();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
/**
* put one task to time wheel,it's well be trigger after delay milliseconds
* if the scheduler is closed, the task will not be put into the time wheel
* if delay is less than 0, the task will be trigger immediately
*
* @param jobId job id, we will use it to find the job
* @param startExecuteTime the task will be trigger in this time, unit is millisecond,and we will convert it to
* delay seconds, we just can be second precision
*/
public void putOneTask(Long jobId, Long startExecuteTime) {
if (isClosed) {
log.info("putOneTask failed, scheduler is closed, jobId: {}", jobId);
return;
}
JobTask jobTask = createAsyncInitialTask(jobId, startExecuteTime);
long taskId = jobTask.getTaskId();
TimerJobTask task = new TimerJobTask(jobId, taskId, startExecuteTime, disruptor);
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
if (timeout == null) {
log.error("putOneTask failed, jobId: {}", task.getJobId());
return;
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
JobTaskManager.addPrepareTask(jobTask);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
JobTaskManager.addPrepareTask(jobTask);
}
// cancel all task for one job
// if task has started, it can't be canceled
public void cancelJobAllTask(Long jobId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
jobTimeoutMap.get(jobId).values().forEach(timeout -> {
if (!timeout.isExpired() || timeout.isCancelled()) {
timeout.cancel();
}
});
JobTaskManager.clearPrepareTaskByJobId(jobId);
}
// get delay time, if startTimestamp is less than now, return 0
private long getDelaySecond(long startTimestamp) {
long delay = 0;
long now = System.currentTimeMillis();
if (startTimestamp > now) {
delay = startTimestamp - now;
} else {
//if execute time is less than now, return 0,immediately execute
log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now);
return delay;
}
return delay / 1000;
}
@Override
public void close() throws IOException {
isClosed = true;
dorisTimer.stop();
disruptor.close();
}
/**
* sort by job id
*
* @param dbFullName database name
* @param category job category
* @param matcher job name matcher
*/
public List<Job> queryJob(String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) {
List<Job> jobs = new ArrayList<>();
jobMap.values().forEach(job -> {
if (matchJob(job, dbFullName, jobName, category, matcher)) {
jobs.add(job);
}
});
return jobs;
}
private boolean matchJob(Job job, String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) {
if (StringUtils.isNotBlank(dbFullName) && !job.getDbName().equalsIgnoreCase(dbFullName)) {
return false;
}
if (StringUtils.isNotBlank(jobName) && !job.getJobName().equalsIgnoreCase(jobName)) {
return false;
}
if (category != null && !job.getJobCategory().equals(category)) {
return false;
}
return null == matcher || matcher.match(job.getJobName());
}
public void putOneJobToQueen(Long jobId) {
JobTask jobTask = createInitialTask(jobId, null);
JobTaskManager.addPrepareTask(jobTask);
disruptor.tryPublish(jobId, jobTask.getTaskId());
}
private JobTask createAsyncInitialTask(long jobId, long createTimeMs) {
long taskId = System.nanoTime();
return new JobTask(jobId, taskId, createTimeMs);
}
private <T> JobTask createInitialTask(long jobId, T context) {
long taskId = System.nanoTime();
return new JobTask(jobId, taskId, System.currentTimeMillis(), context);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobMap.size());
for (Job job : jobMap.values()) {
job.write(out);
}
}
/**
* read job from data input, and init job
*
* @param in data input
* @throws IOException io exception when read data input error
*/
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Job job = Job.readFields(in);
jobMap.putIfAbsent(job.getJobId(), job);
}
}
/**
* clear finish job,if job finish time is more than @Config.finish_job_max_saved_second, we will delete it
* this method will be called every 10 minutes, therefore, the actual maximum
* deletion time is Config.finish_job_max_saved_second + 10 min.
* we could to delete job in time, but it's not make sense.start
*/
private void clearFinishJob() {
Long now = System.currentTimeMillis();
jobMap.values().forEach(job -> {
if (job.isFinished() && now - job.getLatestCompleteExecuteTimeMs() > Config.finish_job_max_saved_second) {
jobMap.remove(job.getJobId());
Env.getCurrentEnv().getEditLog().logDeleteJob(job);
Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId());
log.debug("delete finish job:{}", job.getJobId());
}
});
}
}

View File

@ -43,6 +43,8 @@ public class TransientTaskManager {
private TaskDisruptor disruptor;
public TransientTaskManager() {
disruptor = new TaskDisruptor(this);
disruptor.start();
}
public TransientTaskExecutor getMemoryTaskExecutor(Long taskId) {

View File

@ -1,136 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.registry;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.Job;
import java.io.IOException;
import java.util.List;
/**
* This interface provides a contract for registering timed scheduling events.
* The implementation should trigger events in a timely manner using a specific algorithm.
* The execution of the events may be asynchronous and not guarantee strict timing accuracy.
*/
public interface PersistentJobRegister {
/**
* Register a job
*
* @param name job name,it's not unique
* @param intervalMs job interval, unit: ms
* @param executor job executor @See {@link JobExecutor}
* @return event job id
*/
Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException;
/**
* Register a job
*
* @param name job name,it's not unique
* @param intervalMs job interval, unit: ms
* @param startTimeStamp job start time stamp, unit: ms
* if startTimeStamp is null, event job will start immediately in the next cycle
* startTimeStamp should be greater than current time
* @param executor event job executor @See {@link JobExecutor}
* @return job id
*/
Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) throws DdlException;
/**
* Register a event job
*
* @param name job name,it's not unique
* @param intervalMs job interval, unit: ms
* @param startTimeStamp job start time stamp, unit: ms
* if startTimeStamp is null, job will start immediately in the next cycle
* startTimeStamp should be greater than current time
* @param endTimeStamp job end time stamp, unit: ms
* if endTimeStamp is null, job will never stop
* endTimeStamp must be greater than startTimeStamp and endTimeStamp should be greater
* than current time
* @param executor event job executor @See {@link JobExecutor}
* @return event job id
*/
Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp,
JobExecutor executor) throws DdlException;
/**
* if job is running, pause it
* pause means event job will not be executed in the next cycle,but current cycle will not be interrupted
* we can resume it by {@link #resumeJob(Long)}
*
* @param jodId job id
* if jobId not exist, return false
*/
void pauseJob(Long jodId);
void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
/**
* if job is running, stop it
* stop means event job will not be executed in the next cycle and current cycle will be interrupted
* stop not can be resumed, if you want to resume it, you should register it again
* we will delete stopped event job
*
* @param jobId event job id
*/
void stopJob(Long jobId);
void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
/**
* if job is paused, resume it
*
* @param jobId job id
*/
void resumeJob(Long jobId);
Long registerJob(Job job) throws DdlException;
/**
* execute job task immediately,this method will not change job status and don't affect scheduler job
* this task type should set to {@link org.apache.doris.scheduler.constants.TaskType#MANUAL_JOB_TASK}
*
* @param jobId job id
* @param contextData if you need to pass parameters to the task,
* @param <T> context data type
* @return true if execute success, false if execute failed,
* if job is not exist or job is not running, or job not support manual execute, return false
*/
<T> boolean immediateExecuteTask(Long jobId, T contextData) throws DdlException;
List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher);
/**
* close job scheduler register
* close means job scheduler register will not accept new job
* Jobs that have not reached the trigger time will not be executed. Jobs that have reached the trigger time will
* have an execution time of 5 seconds, and will not be executed if the time exceeds
*/
void close() throws IOException;
}

View File

@ -1,115 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.registry;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.manager.TimerJobManager;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
* This class registers timed scheduling events using the Netty time wheel algorithm to trigger events in a timely
* manner.
* After the event is triggered, it is produced by the Disruptor producer and consumed by the consumer, which is an
* asynchronous
* consumption model that does not guarantee strict timing accuracy.
*/
@Slf4j
public class TimerJobRegister implements PersistentJobRegister {
private final TimerJobManager timerJobManager;
public TimerJobRegister(TimerJobManager timerJobManager) {
this.timerJobManager = timerJobManager;
}
@Override
public Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException {
return this.registerJob(name, intervalMs, null, null, executor);
}
@Override
public Long registerJob(String name, Long intervalMs, Long startTimeMs, JobExecutor executor) throws DdlException {
return this.registerJob(name, intervalMs, startTimeMs, null, executor);
}
@Override
public Long registerJob(String name, Long intervalMs, Long startTimeMs, Long endTimeStamp,
JobExecutor executor) throws DdlException {
Job job = new Job(name, intervalMs, startTimeMs, endTimeStamp, executor);
return timerJobManager.registerJob(job);
}
@Override
public Long registerJob(Job job) throws DdlException {
return timerJobManager.registerJob(job);
}
@Override
public <T> boolean immediateExecuteTask(Long jobId, T data) throws DdlException {
return timerJobManager.immediateExecuteTask(jobId, data);
}
@Override
public void pauseJob(Long jobId) {
timerJobManager.pauseJob(jobId);
}
@Override
public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
timerJobManager.pauseJob(dbName, jobName, jobCategory);
}
@Override
public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
timerJobManager.resumeJob(dbName, jobName, jobCategory);
}
@Override
public void stopJob(Long jobId) {
timerJobManager.stopJob(jobId);
}
@Override
public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
timerJobManager.stopJob(dbName, jobName, jobCategory);
}
@Override
public void resumeJob(Long jobId) {
timerJobManager.resumeJob(jobId);
}
@Override
public List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher) {
return timerJobManager.queryJob(dbFullName, jobName, jobCategory, matcher);
}
@Override
public void close() throws IOException {
timerJobManager.close();
}
}