[Feature](Job)Support scheduler job (#21916)

This commit is contained in:
Calvin Kirs
2023-08-02 21:34:43 +08:00
committed by GitHub
parent 6f575cf4b3
commit e5028314bc
45 changed files with 2499 additions and 398 deletions

View File

@ -1520,6 +1520,9 @@ public class Config extends ConfigBase {
@ConfField
public static boolean enable_pipeline_load = false;
@ConfField
public static int scheduler_job_task_max_num = 10;
// enable_workload_group should be immutable and temporarily set to mutable during the development test phase
@ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_workload_group = false;

View File

@ -260,6 +260,7 @@ terminal String
KW_AUTO_INCREMENT,
KW_AS,
KW_ASC,
KW_AT,
KW_AUTHORS,
KW_BACKEND,
KW_BACKENDS,
@ -342,6 +343,7 @@ terminal String
KW_DISTRIBUTED,
KW_DISTRIBUTION,
KW_DIV,
KW_DO,
KW_DOUBLE,
KW_DROP,
KW_DROPP,
@ -352,11 +354,13 @@ terminal String
KW_ENCRYPTKEY,
KW_ENCRYPTKEYS,
KW_END,
KW_ENDS,
KW_ENGINE,
KW_ENGINES,
KW_ENTER,
KW_ERRORS,
KW_EVENTS,
KW_EVERY,
KW_EXCEPT,
KW_EXCLUDE,
KW_EXISTS,
@ -423,6 +427,7 @@ terminal String
KW_ISOLATION,
KW_INVERTED,
KW_JOB,
KW_JOBS,
KW_JOIN,
KW_JSON,
KW_JSONB,
@ -551,6 +556,7 @@ terminal String
KW_ROWS,
KW_S3,
KW_SAMPLE,
KW_SCHEDULER,
KW_SCHEMA,
KW_SCHEMAS,
KW_SECOND,
@ -569,11 +575,13 @@ terminal String
KW_SPLIT,
KW_SQL_BLOCK_RULE,
KW_START,
KW_STARTS,
KW_STATS,
KW_STATUS,
KW_STOP,
KW_STORAGE,
KW_STREAM,
KW_STREAMING,
KW_STRING,
KW_STRUCT,
KW_SUM,
@ -587,6 +595,7 @@ terminal String
KW_TABLET,
KW_TABLETS,
KW_TASK,
KW_TASKS,
KW_TEMPORARY,
KW_TERMINATED,
KW_TEXT,
@ -662,6 +671,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt,
show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, show_create_reporitory_stmt,
describe_stmt, alter_stmt,
create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
@ -796,6 +806,8 @@ nonterminal Qualifier opt_set_qualifier;
nonterminal Operation set_op;
nonterminal ArrayList<String> opt_common_hints;
nonterminal String optional_on_ident;
nonterminal String opt_job_starts;
nonterminal String opt_job_ends;
nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
@ -1128,6 +1140,16 @@ stmt ::=
{: RESULT = stmt; :}
| show_create_routine_load_stmt : stmt
{: RESULT = stmt; :}
| create_job_stmt : stmt
{: RESULT = stmt; :}
| pause_job_stmt : stmt
{: RESULT = stmt; :}
| show_job_stmt : stmt
{: RESULT = stmt; :}
| stop_job_stmt : stmt
{: RESULT = stmt; :}
| resume_job_stmt : stmt
{: RESULT = stmt; :}
| show_create_load_stmt : stmt
{: RESULT = stmt; :}
| show_create_reporitory_stmt : stmt
@ -2611,7 +2633,75 @@ resource_desc ::=
RESULT = new ResourceDesc(resourceName, properties);
:}
;
create_job_stmt ::=
KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime, endsTime,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
| KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql
{:
CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql);
RESULT = stmt;
:}
;
opt_job_starts ::=
{:
RESULT = null;
:}
| KW_STARTS STRING_LITERAL:startTime
{:
RESULT = startTime;
:}
;
opt_job_ends ::=
{:
RESULT = null;
:}
| KW_ENDS STRING_LITERAL:endTime
{:
RESULT = endTime;
:}
;
resume_job_stmt ::=
KW_RESUME KW_JOB job_label:jobLabel
{:
RESULT = new ResumeJobStmt(jobLabel);
:}
;
show_job_stmt ::=
KW_SHOW KW_JOBS
{:
RESULT = new ShowJobStmt(null,null);
:}
| KW_SHOW KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobStmt(jobLabel,null);
:}
| KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel
{:
RESULT = new ShowJobTaskStmt(jobLabel);
:}
;
pause_job_stmt ::=
KW_PAUSE KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new PauseJobStmt(jobLabel);
:}
;
stop_job_stmt ::=
KW_STOP KW_JOB KW_FOR job_label:jobLabel
{:
RESULT = new StopJobStmt(jobLabel);
:}
;
// Routine load statement
create_routine_load_stmt ::=
KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName
@ -7347,6 +7437,8 @@ keyword ::=
{: RESULT = id; :}
| KW_DISTINCTPCSA:id
{: RESULT = id; :}
| KW_DO:id
{: RESULT = id; :}
| KW_BUCKETS:id
{: RESULT = id; :}
| KW_FAST:id
@ -7411,6 +7503,8 @@ keyword ::=
{: RESULT = id; :}
| KW_JOB:id
{: RESULT = id; :}
| KW_JOBS:id
{: RESULT = id; :}
| KW_JSON:id
{: RESULT = id; :}
| KW_JSONB:id
@ -7523,7 +7617,7 @@ keyword ::=
{: RESULT = id; :}
| KW_ROLLBACK:id
{: RESULT = id; :}
| KW_ROLLUP:id
| KW_SCHEDULER:id
{: RESULT = id; :}
| KW_SCHEMA:id
{: RESULT = id; :}
@ -7549,6 +7643,8 @@ keyword ::=
{: RESULT = id; :}
| KW_STREAM:id
{: RESULT = id; :}
| KW_STREAMING:id
{: RESULT = id; :}
| KW_STRUCT:id
{: RESULT = id; :}
| KW_STRING:id
@ -7615,8 +7711,20 @@ keyword ::=
{: RESULT = id; :}
| KW_TASK:id
{: RESULT = id; :}
| KW_TASKS:id
{: RESULT = id; :}
| KW_ROUTINE:id
{: RESULT = id; :}
| KW_EVERY:id
{: RESULT = id; :}
| KW_AT:id
{: RESULT = id; :}
| KW_ROLLUP:id
{: RESULT = id; :}
| KW_STARTS:id
{: RESULT = id; :}
| KW_ENDS:id
{: RESULT = id; :}
| KW_PAUSE:id
{: RESULT = id; :}
| KW_RESUME:id

View File

@ -0,0 +1,209 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* syntax:
* CREATE
* [DEFINER = user]
* JOB
* event_name
* ON SCHEDULE schedule
* [COMMENT 'string']
* DO event_body;
* schedule: {
* [STREAMING] AT timestamp
* | EVERY interval
* [STARTS timestamp ]
* [ENDS timestamp ]
* }
* interval:
* quantity { DAY | HOUR | MINUTE |
* WEEK | SECOND }
*
*/
@Slf4j
public class CreateJobStmt extends DdlStmt {
@Getter
private StatementBase stmt;
@Getter
private Job job;
private final LabelName labelName;
private final String onceJobStartTimestamp;
private final Long interval;
private final String intervalTimeUnit;
private final String startsTimeStamp;
private final String endsTimeStamp;
private final String comment;
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private static final ImmutableSet<String> supportStmtClassName = new ImmutableSet.Builder<String>()
.add(NativeInsertStmt.class.getName()).build();
public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
this.labelName = labelName;
this.onceJobStartTimestamp = onceJobStartTimestamp;
this.interval = interval;
this.intervalTimeUnit = intervalTimeUnit;
this.startsTimeStamp = startsTimeStamp;
this.endsTimeStamp = endsTimeStamp;
this.comment = comment;
this.stmt = doStmt;
this.job = new Job();
job.setStreamingJob(isStreamingJob);
}
private String parseExecuteSql(String sql) throws AnalysisException {
sql = sql.toLowerCase();
int executeSqlIndex = sql.indexOf(" do ");
String executeSql = sql.substring(executeSqlIndex + 4).trim();
if (StringUtils.isBlank(executeSql)) {
throw new AnalysisException("execute sql has invalid format");
}
return executeSql;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
job.setDbName(labelName.getDbName());
job.setJobName(labelName.getLabelName());
if (StringUtils.isNotBlank(onceJobStartTimestamp)) {
analyzerOnceTimeJob();
} else {
analyzerCycleJob();
}
if (ConnectContext.get() != null) {
timezone = ConnectContext.get().getSessionVariable().getTimeZone();
}
timezone = TimeUtils.checkTimeZoneValidAndStandardize(timezone);
job.setTimezone(timezone);
job.setComment(comment);
//todo support user define
job.setUser("root");
job.setJobStatus(JobStatus.RUNNING);
job.setJobCategory(JobCategory.SQL);
analyzerSqlStmt();
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can create job");
}
}
private void analyzerSqlStmt() throws UserException {
if (!supportStmtClassName.contains(stmt.getClass().getName())) {
throw new AnalysisException("Not support stmt type");
}
stmt.analyze(analyzer);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt);
SqlJobExecutor sqlJobExecutor = new SqlJobExecutor(executeSql);
job.setExecutor(sqlJobExecutor);
}
private void analyzerCycleJob() throws UserException {
job.setCycleJob(true);
if (null == interval) {
throw new AnalysisException("interval is null");
}
if (interval <= 0) {
throw new AnalysisException("interval must be greater than 0");
}
if (StringUtils.isBlank(intervalTimeUnit)) {
throw new AnalysisException("intervalTimeUnit is null");
}
try {
IntervalUnit intervalUnit = IntervalUnit.valueOf(intervalTimeUnit.toUpperCase());
job.setIntervalUnit(intervalUnit);
long intervalTimeMs = intervalUnit.getParameterValue(interval);
job.setIntervalMs(intervalTimeMs);
job.setOriginInterval(interval);
} catch (IllegalArgumentException e) {
throw new AnalysisException("interval time unit is not valid, we only support second,minute,hour,day,week");
}
if (StringUtils.isNotBlank(startsTimeStamp)) {
long startsTimeMillis = TimeUtils.timeStringToLong(startsTimeStamp);
if (startsTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("starts time must be greater than current time");
}
job.setStartTimeMs(startsTimeMillis);
}
if (StringUtils.isNotBlank(endsTimeStamp)) {
long endTimeMillis = TimeUtils.timeStringToLong(endsTimeStamp);
if (endTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("ends time must be greater than current time");
}
job.setEndTimeMs(endTimeMillis);
}
if (job.getStartTimeMs() > 0 && job.getEndTimeMs() > 0
&& (job.getEndTimeMs() - job.getStartTimeMs() < job.getIntervalMs())) {
throw new AnalysisException("ends time must be greater than start time and interval time");
}
}
private void analyzerOnceTimeJob() throws UserException {
job.setCycleJob(false);
job.setIntervalMs(0L);
long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp);
if (executeAtTimeMillis < System.currentTimeMillis()) {
throw new AnalysisException("job time stamp must be greater than current time");
}
job.setStartTimeMs(executeAtTimeMillis);
}
}

View File

@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
/**
* syntax:
* PAUSE JOB FOR [database.]name
* we can pause a job by jobName
* it's only running job can be paused, and it will be paused immediately
* paused job can be resumed by RESUME EVENT FOR jobName
*/
public class PauseJobStmt extends DdlStmt {
private final LabelName labelName;
private String db;
public PauseJobStmt(LabelName labelName) {
this.labelName = labelName;
}
public boolean isAll() {
return labelName == null;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return db;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
}

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
public class ResumeJobStmt extends DdlStmt {
private final LabelName labelName;
private String db;
public ResumeJobStmt(LabelName labelName) {
this.labelName = labelName;
}
public boolean isAll() {
return labelName == null;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return db;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
checkAuth();
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
}

View File

@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.List;
/**
* SHOW JOB [FOR JobName]
* eg: show event
* return all job in connection db
* eg: show event for test
* return job named test in connection db
*/
public class ShowJobStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("Id")
.add("Db")
.add("Name")
.add("Definer")
.add("TimeZone")
.add("ExecuteType")
.add("ExecuteAt")
.add("ExecuteInterval")
.add("ExecuteIntervalUnit")
.add("Starts")
.add("Ends")
.add("Status")
.add("LastExecuteFinishTime")
.add("ErrorMsg")
.add("Comment")
.build();
private final LabelName labelName;
private String dbFullName; // optional
private String name; // optional
private String pattern; // optional
public ShowJobStmt(LabelName labelName, String pattern) {
this.labelName = labelName;
this.pattern = pattern;
}
public String getDbFullName() {
return dbFullName;
}
public String getName() {
return name;
}
public String getPattern() {
return pattern;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
private void checkLabelName(Analyzer analyzer) throws AnalysisException {
String dbName = labelName == null ? null : labelName.getDbName();
if (Strings.isNullOrEmpty(dbName)) {
dbFullName = analyzer.getContext().getDatabase();
if (Strings.isNullOrEmpty(dbFullName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
} else {
dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
name = labelName == null ? null : labelName.getLabelName();
}
public static List<String> getTitleNames() {
return TITLE_NAMES;
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}

View File

@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.List;
/**
* SHOW JOB TASKS [FOR JobName]
*/
public class ShowJobTaskStmt extends ShowStmt {
private static final ImmutableList<String> TITLE_NAMES =
new ImmutableList.Builder<String>()
.add("JobId")
.add("TaskId")
.add("StartTime")
.add("EndTime")
.add("Status")
.add("ErrorMsg")
.build();
private final LabelName labelName;
private String dbFullName; // optional
private String name; // optional
public ShowJobTaskStmt(LabelName labelName) {
this.labelName = labelName;
}
public String getDbFullName() {
return dbFullName;
}
public String getName() {
return name;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
checkLabelName(analyzer);
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
private void checkLabelName(Analyzer analyzer) throws AnalysisException {
String dbName = labelName == null ? null : labelName.getDbName();
if (Strings.isNullOrEmpty(dbName)) {
dbFullName = analyzer.getContext().getDatabase();
if (Strings.isNullOrEmpty(dbFullName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
} else {
dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
if (null == labelName) {
throw new AnalysisException("Job name is null");
}
name = labelName.getLabelName();
}
public static List<String> getTitleNames() {
return TITLE_NAMES;
}
@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
/**
* syntax:
* STOP JOB FOR [database.]name
* only run job can be stopped, and stopped job can't be resumed
*/
public class StopJobStmt extends DdlStmt {
private final LabelName labelName;
public StopJobStmt(LabelName labelName) {
this.labelName = labelName;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return labelName.getDbName();
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkAuth();
labelName.analyze(analyzer);
}
private void checkAuth() throws AnalysisException {
UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity();
if (!userIdentity.isRootUser()) {
throw new AnalysisException("only root user can operate");
}
}
}

View File

@ -211,6 +211,10 @@ import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.scheduler.AsyncJobRegister;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import org.apache.doris.scheduler.manager.JobTaskManager;
import org.apache.doris.scheduler.registry.PersistentJobRegister;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
@ -330,6 +334,9 @@ public class Env {
private CooldownConfHandler cooldownConfHandler;
private MetastoreEventsProcessor metastoreEventsProcessor;
private PersistentJobRegister persistentJobRegister;
private AsyncJobManager asyncJobManager;
private JobTaskManager jobTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon replayer;
@ -585,7 +592,9 @@ public class Env {
this.cooldownConfHandler = new CooldownConfHandler();
}
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobTaskManager = new JobTaskManager();
this.asyncJobManager = new AsyncJobManager();
this.persistentJobRegister = new AsyncJobRegister(asyncJobManager);
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
@ -1957,6 +1966,30 @@ public class Env {
return checksum;
}
public long loadAsyncJobManager(DataInputStream in, long checksum) throws IOException {
asyncJobManager.readFields(in);
LOG.info("finished replay asyncJobMgr from image");
return checksum;
}
public long saveAsyncJobManager(CountingDataOutputStream out, long checksum) throws IOException {
asyncJobManager.write(out);
LOG.info("finished save analysisMgr to image");
return checksum;
}
public long loadJobTaskManager(DataInputStream in, long checksum) throws IOException {
jobTaskManager.readFields(in);
LOG.info("finished replay jobTaskMgr from image");
return checksum;
}
public long saveJobTaskManager(CountingDataOutputStream out, long checksum) throws IOException {
jobTaskManager.write(out);
LOG.info("finished save jobTaskMgr to image");
return checksum;
}
public long loadResources(DataInputStream in, long checksum) throws IOException {
resourceMgr = ResourceMgr.read(in);
LOG.info("finished replay resources from image");
@ -3726,6 +3759,18 @@ public class Env {
return this.syncJobManager;
}
public PersistentJobRegister getJobRegister() {
return persistentJobRegister;
}
public AsyncJobManager getAsyncJobManager() {
return asyncJobManager;
}
public JobTaskManager getJobTaskManager() {
return jobTaskManager;
}
public SmallFileMgr getSmallFileMgr() {
return this.smallFileMgr;
}

View File

@ -36,7 +36,8 @@ public enum CaseSensibility {
RESOURCE(true),
CONFIG(true),
ROUTINE_LOAD(true),
WORKLOAD_GROUP(true);
WORKLOAD_GROUP(true),
JOB(true);
private boolean caseSensitive;

View File

@ -22,5 +22,7 @@ public enum LogKey {
ROUTINE_LOAD_TASK,
LOAD_JOB,
LOAD_TASK,
SYNC_JOB
SYNC_JOB,
SCHEDULER_JOB,
SCHEDULER_TASK
}

View File

@ -118,6 +118,8 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@ -518,6 +520,21 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_UPDATE_SCHEDULER_JOB:
case OperationType.OP_DELETE_SCHEDULER_JOB:
case OperationType.OP_CREATE_SCHEDULER_JOB: {
Job job = Job.readFields(in);
data = job;
isRead = true;
break;
}
case OperationType.OP_CREATE_SCHEDULER_TASK:
case OperationType.OP_DELETE_SCHEDULER_TASK: {
JobTask task = JobTask.readFields(in);
data = task;
isRead = true;
break;
}
case OperationType.OP_CREATE_LOAD_JOB: {
data = org.apache.doris.load.loadv2.LoadJob.read(in);
isRead = true;

View File

@ -81,6 +81,8 @@ import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@ -644,6 +646,31 @@ public class EditLog {
Env.getCurrentEnv().getRoutineLoadManager().replayCreateRoutineLoadJob(routineLoadJob);
break;
}
case OperationType.OP_CREATE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayCreateJob(job);
break;
}
case OperationType.OP_UPDATE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayUpdateJob(job);
break;
}
case OperationType.OP_DELETE_SCHEDULER_JOB: {
Job job = (Job) journal.getData();
Env.getCurrentEnv().getAsyncJobManager().replayDeleteJob(job);
break;
}
case OperationType.OP_CREATE_SCHEDULER_TASK: {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task);
break;
}
case OperationType.OP_DELETE_SCHEDULER_TASK: {
JobTask task = (JobTask) journal.getData();
Env.getCurrentEnv().getJobTaskManager().replayDeleteTask(task);
break;
}
case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: {
RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData();
Env.getCurrentEnv().getRoutineLoadManager().replayChangeRoutineLoadJob(operation);
@ -1536,6 +1563,26 @@ public class EditLog {
logEdit(OperationType.OP_CREATE_ROUTINE_LOAD_JOB, routineLoadJob);
}
public void logCreateJob(Job job) {
logEdit(OperationType.OP_CREATE_SCHEDULER_JOB, job);
}
public void logUpdateJob(Job job) {
logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job);
}
public void logCreateJobTask(JobTask jobTask) {
logEdit(OperationType.OP_CREATE_SCHEDULER_TASK, jobTask);
}
public void logDeleteJobTask(JobTask jobTask) {
logEdit(OperationType.OP_DELETE_SCHEDULER_TASK, jobTask);
}
public void logDeleteJob(Job job) {
logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job);
}
public void logOpRoutineLoadJob(RoutineLoadOperation routineLoadOperation) {
logEdit(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB, routineLoadOperation);
}

View File

@ -313,6 +313,17 @@ public class OperationType {
// change an auto increment id for a column
public static final short OP_UPDATE_AUTO_INCREMENT_ID = 437;
// scheduler job
public static final short OP_CREATE_SCHEDULER_JOB = 450;
public static final short OP_UPDATE_SCHEDULER_JOB = 451;
public static final short OP_DELETE_SCHEDULER_JOB = 452;
public static final short OP_CREATE_SCHEDULER_TASK = 453;
public static final short OP_DELETE_SCHEDULER_TASK = 454;
/**
* Get opcode name by op code.
**/

View File

@ -81,6 +81,8 @@ import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.system.BackendHbResponse;
import org.apache.doris.system.BrokerHbResponse;
import org.apache.doris.system.FrontendHbResponse;
@ -211,6 +213,10 @@ public class GsonUtils {
RuntimeTypeAdapterFactory.of(
AbstractDataSourceProperties.class, "clazz")
.registerSubtype(KafkaDataSourceProperties.class, KafkaDataSourceProperties.class.getSimpleName());
private static RuntimeTypeAdapterFactory<JobExecutor> jobExecutorRuntimeTypeAdapterFactory =
RuntimeTypeAdapterFactory.of(
JobExecutor.class, "clazz")
.registerSubtype(SqlJobExecutor.class, SqlJobExecutor.class.getSimpleName());
private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
@ -264,10 +270,25 @@ public class GsonUtils {
.registerTypeAdapterFactory(partitionInfoTypeAdapterFactory)
.registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
.registerTypeAdapterFactory(rdsTypeAdapterFactory)
.registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter())
.registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer())
.registerTypeAdapter(Range.class, new RangeUtils.RangeSerializer());
.registerTypeAdapter(Range.class, new RangeUtils.RangeSerializer()).setExclusionStrategies(
new ExclusionStrategy() {
@Override
public boolean shouldSkipField(FieldAttributes f) {
return false;
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
/* due to java.lang.IllegalArgumentException: com.lmax.disruptor.RingBuffer
<org.apache.doris.scheduler.disruptor.TimerTaskEvent> declares multiple
JSON fields named p1 */
return clazz.getName().startsWith("com.lmax.disruptor.RingBuffer");
}
});
private static final GsonBuilder GSON_BUILDER_PRETTY_PRINTING = GSON_BUILDER.setPrettyPrinting();

View File

@ -230,6 +230,18 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveAnalysisMgr", CountingDataOutputStream.class, long.class);
break;
case "AsyncJobManager":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadAsyncJobManager", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveAsyncJobManager", CountingDataOutputStream.class, long.class);
break;
case "JobTaskManager":
metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadJobTaskManager", DataInputStream.class, long.class);
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveJobTaskManager", CountingDataOutputStream.class, long.class);
break;
default:
break;
}

View File

@ -39,7 +39,7 @@ public class PersistMetaModules {
"globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles",
"plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "workloadGroups",
"binlogs", "resourceGroups", "AnalysisMgr");
"binlogs", "resourceGroups", "AnalysisMgr", "AsyncJobManager", "JobTaskManager");
// Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES)
public static final ImmutableList<String> DEPRECATED_MODULE_NAMES = ImmutableList.of(

View File

@ -58,6 +58,7 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateEncryptKeyStmt;
import org.apache.doris.analysis.CreateFileStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateJobStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
@ -93,6 +94,7 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.PauseJobStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.RecoverDbStmt;
@ -104,10 +106,12 @@ import org.apache.doris.analysis.RefreshLdapStmt;
import org.apache.doris.analysis.RefreshMaterializedViewStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.ResumeJobStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.RevokeStmt;
import org.apache.doris.analysis.SetUserPropertyStmt;
import org.apache.doris.analysis.StopJobStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
@ -119,6 +123,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.logging.log4j.LogManager;
@ -182,6 +187,17 @@ public class DdlExecutor {
env.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof AlterRoutineLoadStmt) {
env.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateJobStmt) {
env.getJobRegister().registerJob((((CreateJobStmt) ddlStmt).getJob()));
} else if (ddlStmt instanceof StopJobStmt) {
StopJobStmt stmt = (StopJobStmt) ddlStmt;
env.getJobRegister().stopJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
} else if (ddlStmt instanceof PauseJobStmt) {
PauseJobStmt stmt = (PauseJobStmt) ddlStmt;
env.getJobRegister().pauseJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
} else if (ddlStmt instanceof ResumeJobStmt) {
ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
env.getJobRegister().resumeJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL);
} else if (ddlStmt instanceof DeleteStmt) {
env.getDeleteHandler().process((DeleteStmt) ddlStmt);
} else if (ddlStmt instanceof CreateUserStmt) {

View File

@ -62,6 +62,8 @@ import org.apache.doris.analysis.ShowFrontendsStmt;
import org.apache.doris.analysis.ShowFunctionsStmt;
import org.apache.doris.analysis.ShowGrantsStmt;
import org.apache.doris.analysis.ShowIndexStmt;
import org.apache.doris.analysis.ShowJobStmt;
import org.apache.doris.analysis.ShowJobTaskStmt;
import org.apache.doris.analysis.ShowLastInsertStmt;
import org.apache.doris.analysis.ShowLoadProfileStmt;
import org.apache.doris.analysis.ShowLoadStmt;
@ -188,6 +190,9 @@ import org.apache.doris.mtmv.MTMVJobManager;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
@ -213,6 +218,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -423,6 +429,10 @@ public class ShowExecutor {
handleShowBuildIndexStmt();
} else if (stmt instanceof ShowAnalyzeTaskStatus) {
handleShowAnalyzeTaskStatus();
} else if (stmt instanceof ShowJobStmt) {
handleShowJob();
} else if (stmt instanceof ShowJobTaskStmt) {
handleShowJobTask();
} else {
handleEmtpy();
}
@ -1378,6 +1388,61 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows);
}
private void handleShowJobTask() {
ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
List<Job> jobs = Env.getCurrentEnv().getJobRegister()
.getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), JobCategory.SQL,
null);
if (CollectionUtils.isEmpty(jobs)) {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
long jobId = jobs.get(0).getJobId();
List<JobTask> jobTasks = Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId);
if (CollectionUtils.isEmpty(jobTasks)) {
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
return;
}
for (JobTask job : jobTasks) {
rows.add(job.getShowInfo());
}
resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows);
}
private void handleShowJob() throws AnalysisException {
ShowJobStmt showJobStmt = (ShowJobStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
// if job exists
List<Job> jobList;
PatternMatcher matcher = null;
if (showJobStmt.getPattern() != null) {
matcher = PatternMatcherWrapper.createMysqlPattern(showJobStmt.getPattern(),
CaseSensibility.JOB.getCaseSensibility());
}
jobList = Env.getCurrentEnv().getJobRegister()
.getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), JobCategory.SQL,
matcher);
if (jobList.isEmpty()) {
resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows);
return;
}
// check auth
for (Job job : jobList) {
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), job.getDbName(), PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR,
ConnectContext.get().getQualifiedUser(), job.getDbName());
}
}
for (Job job : jobList) {
rows.add(job.getShowInfo());
}
resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows);
}
private void handleShowRoutineLoad() throws AnalysisException {
ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt;
List<List<String>> rows = Lists.newArrayList();
@ -2533,7 +2598,7 @@ public class ShowExecutor {
public void handleShowCatalogs() throws AnalysisException {
ShowCatalogStmt showStmt = (ShowCatalogStmt) stmt;
resultSet = Env.getCurrentEnv().getCatalogMgr().showCatalogs(showStmt, ctx.getCurrentCatalog() != null
? ctx.getCurrentCatalog().getName() : null);
? ctx.getCurrentCatalog().getName() : null);
}
// Show create catalog
@ -2741,13 +2806,13 @@ public class ShowExecutor {
resultSet = new ShowResultSet(showMetaData, resultRowSet);
}
private void handleShowBuildIndexStmt() throws AnalysisException {
private void handleShowBuildIndexStmt() throws AnalysisException {
ShowBuildIndexStmt showStmt = (ShowBuildIndexStmt) stmt;
ProcNodeInterface procNodeI = showStmt.getNode();
Preconditions.checkNotNull(procNodeI);
// List<List<String>> rows = ((BuildIndexProcDir) procNodeI).fetchResult().getRows();
List<List<String>> rows = ((BuildIndexProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows();
showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows();
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
}

View File

@ -17,14 +17,18 @@
package org.apache.doris.scheduler;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.AsyncJobManager;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.registry.JobRegister;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import org.apache.doris.scheduler.registry.PersistentJobRegister;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.List;
/**
* This class registers timed scheduling events using the Netty time wheel algorithm to trigger events in a timely
@ -34,45 +38,70 @@ import java.io.IOException;
* consumption model that does not guarantee strict timing accuracy.
*/
@Slf4j
public class AsyncJobRegister implements JobRegister {
public class AsyncJobRegister implements PersistentJobRegister {
private final AsyncJobManager asyncJobManager;
public AsyncJobRegister() {
this.asyncJobManager = new AsyncJobManager();
public AsyncJobRegister(AsyncJobManager asyncJobManager) {
this.asyncJobManager = asyncJobManager;
}
@Override
public Long registerJob(String name, Long intervalMs, JobExecutor executor) {
public Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException {
return this.registerJob(name, intervalMs, null, null, executor);
}
@Override
public Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) {
return this.registerJob(name, intervalMs, startTimeStamp, null, executor);
public Long registerJob(String name, Long intervalMs, Long startTimeMs, JobExecutor executor) throws DdlException {
return this.registerJob(name, intervalMs, startTimeMs, null, executor);
}
@Override
public Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp,
JobExecutor executor) {
public Long registerJob(String name, Long intervalMs, Long startTimeMs, Long endTimeStamp,
JobExecutor executor) throws DdlException {
Job job = new Job(name, intervalMs, startTimeStamp, endTimeStamp, executor);
Job job = new Job(name, intervalMs, startTimeMs, endTimeStamp, executor);
return asyncJobManager.registerJob(job);
}
@Override
public Boolean pauseJob(Long jobId) {
return asyncJobManager.pauseJob(jobId);
public Long registerJob(Job job) throws DdlException {
return asyncJobManager.registerJob(job);
}
@Override
public Boolean stopJob(Long jobId) {
return asyncJobManager.stopJob(jobId);
public void pauseJob(Long jobId) {
asyncJobManager.pauseJob(jobId);
}
@Override
public Boolean resumeJob(Long jobId) {
return asyncJobManager.resumeJob(jobId);
public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
asyncJobManager.pauseJob(dbName, jobName, jobCategory);
}
@Override
public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
asyncJobManager.resumeJob(dbName, jobName, jobCategory);
}
@Override
public void stopJob(Long jobId) {
asyncJobManager.stopJob(jobId);
}
@Override
public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
asyncJobManager.stopJob(dbName, jobName, jobCategory);
}
@Override
public void resumeJob(Long jobId) {
asyncJobManager.resumeJob(jobId);
}
@Override
public List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher) {
return asyncJobManager.queryJob(dbFullName, jobName, jobCategory, matcher);
}
@Override

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.common;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public enum IntervalUnit {
SECOND("second", 0L, TimeUnit.SECONDS::toMillis),
MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis),
HOUR("hour", 0L, TimeUnit.HOURS::toMillis),
DAY("day", 0L, TimeUnit.DAYS::toMillis),
WEEK("week", 0L, v -> TimeUnit.DAYS.toMillis(v * 7));
private final String unit;
public String getUnit() {
return unit;
}
public static IntervalUnit fromString(String unit) {
for (IntervalUnit u : IntervalUnit.values()) {
if (u.unit.equalsIgnoreCase(unit)) {
return u;
}
}
return null;
}
private final Object defaultValue;
private final Function<Long, Long> converter;
IntervalUnit(String unit, Long defaultValue, Function<Long, Long> converter) {
this.unit = unit;
this.defaultValue = defaultValue;
this.converter = converter;
}
IntervalUnit getByName(String name) {
return Arrays.stream(IntervalUnit.values())
.filter(config -> config.getUnit().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name));
}
public Long getParameterValue(Long param) {
return (Long) (param != null ? converter.apply(param) : defaultValue);
}
}

View File

@ -15,27 +15,35 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler;
package org.apache.doris.scheduler.constants;
import org.apache.doris.scheduler.registry.JobRegister;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
/**
* This class provides a factory for creating instances of {@link JobRegister}.
* The factory ensures that only one instance of the client is created in a lazy manner.
* The job category is used to distinguish different types of jobs.
*/
public class JobRegisterFactory {
private static final AtomicReference<JobRegister> INSTANCE = new AtomicReference<>();
public enum JobCategory {
COMMON(1, "common"),
SQL(2, "sql"),
;
public static JobRegister getInstance() {
JobRegister instance = INSTANCE.get();
if (instance == null) {
instance = new AsyncJobRegister();
if (!INSTANCE.compareAndSet(null, instance)) {
instance = INSTANCE.get();
@Getter
private int code;
@Getter
private String description;
JobCategory(int code, String description) {
this.code = code;
this.description = description;
}
public static JobCategory getJobCategoryByCode(int code) {
for (JobCategory jobCategory : JobCategory.values()) {
if (jobCategory.getCode() == code) {
return jobCategory;
}
}
return instance;
return null;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.scheduler.constants;
public enum JobStatus {
/**
* When the task is not started, the initial state will be triggered.
* The initial state can be started
@ -35,4 +36,11 @@ public enum JobStatus {
* The stop state cannot be resumed
*/
STOPPED,
WAITING_FINISH,
/**
* When the task is finished, the finished state will be triggered.
*/
FINISHED
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.constants;
public enum JobType {
/**
* The job will be executed only once.
*/
ONE_TIME,
/**
* The job will be executed periodically.
*/
RECURRING,
/**
* JOB_TYPE_STREAMING is used to identify the streaming job.
*/
STREAMING
}

View File

@ -21,7 +21,7 @@ import lombok.Getter;
/**
* System scheduler event job
* They will start when scheduler starts
* They will start when scheduler starts,don't use this job in other place,it just for system inner scheduler
*/
public enum SystemJob {

View File

@ -17,10 +17,10 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.scheduler.job.AsyncJobManager;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventTranslatorTwoArg;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
@ -66,14 +66,11 @@ public class TimerTaskDisruptor implements Closeable {
private boolean isClosed = false;
/**
* The default {@link EventTranslatorTwoArg} to use for {@link #tryPublish(Long, Long)}.
* The default {@link EventTranslatorOneArg} to use for {@link #tryPublish(Long)}.
* This is used to avoid creating a new object for each publish.
*/
private static final EventTranslatorTwoArg<TimerTaskEvent, Long, Long> TRANSLATOR
= (event, sequence, jobId, taskId) -> {
event.setJobId(jobId);
event.setTaskId(taskId);
};
private static final EventTranslatorOneArg<TimerTaskEvent, Long> TRANSLATOR
= (event, sequence, jobId) -> event.setJobId(jobId);
public TimerTaskDisruptor(AsyncJobManager asyncJobManager) {
ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
@ -90,18 +87,17 @@ public class TimerTaskDisruptor implements Closeable {
/**
* Publishes an event to the disruptor.
*
* @param eventId event job id
* @param taskId event task id
* @param jobId job id
*/
public void tryPublish(Long eventId, Long taskId) {
public void tryPublish(Long jobId) {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, eventId: {}", eventId);
log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId);
return;
}
try {
disruptor.publishEvent(TRANSLATOR, eventId, taskId);
disruptor.publishEvent(TRANSLATOR, jobId);
} catch (Exception e) {
log.error("tryPublish failed, eventId: {}", eventId, e);
log.error("tryPublish failed, jobId: {}", jobId, e);
}
}
@ -111,7 +107,7 @@ public class TimerTaskDisruptor implements Closeable {
return false;
}
try {
disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId(), timerTaskEvent.getTaskId());
disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId());
return true;
} catch (Exception e) {
log.error("tryPublish failed, eventJobId: {}", timerTaskEvent.getJobId(), e);

View File

@ -32,7 +32,5 @@ public class TimerTaskEvent {
private Long jobId;
private Long taskId;
public static final EventFactory<TimerTaskEvent> FACTORY = TimerTaskEvent::new;
}

View File

@ -17,9 +17,13 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.SystemJob;
import org.apache.doris.scheduler.job.AsyncJobManager;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import org.apache.doris.scheduler.manager.JobTaskManager;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
@ -41,6 +45,8 @@ public class TimerTaskExpirationHandler implements WorkHandler<TimerTaskEvent> {
*/
private AsyncJobManager asyncJobManager;
private JobTaskManager jobTaskManager;
/**
* Constructs a new {@link TimerTaskExpirationHandler} instance with the specified event job manager.
*
@ -79,21 +85,38 @@ public class TimerTaskExpirationHandler implements WorkHandler<TimerTaskEvent> {
log.info("Event job is null, eventJobId: {}", jobId);
return;
}
if (!job.isRunning()) {
if (!job.isRunning() && !job.getJobStatus().equals(JobStatus.WAITING_FINISH)) {
log.info("Event job is not running, eventJobId: {}", jobId);
return;
}
log.debug("Event job is running, eventJobId: {}", jobId);
checkJobIsExpired(job);
JobTask jobTask = new JobTask(jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
// TODO: We should record the result of the event task.
//Object result = job.getExecutor().execute();
job.getExecutor().execute();
job.setLatestCompleteExecuteTimestamp(System.currentTimeMillis());
job.getExecutor().execute(job);
job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis());
if (job.isCycleJob()) {
updateJobStatusIfPastEndTime(job);
} else {
// one time job should be finished after execute
updateOnceTimeJobStatus(job);
}
jobTask.setIsSuccessful(true);
} catch (Exception e) {
log.error("Event job execute failed, jobId: {}", jobId, e);
log.warn("Event job execute failed, jobId: {}, msg : {}", jobId, e.getMessage());
job.pause(e.getMessage());
jobTask.setErrorMsg(e.getMessage());
jobTask.setIsSuccessful(false);
}
jobTask.setEndTimeMs(System.currentTimeMillis());
if (null == jobTaskManager) {
jobTaskManager = Env.getCurrentEnv().getJobTaskManager();
}
jobTaskManager.addJobTask(jobTask);
}
/**
@ -117,9 +140,18 @@ public class TimerTaskExpirationHandler implements WorkHandler<TimerTaskEvent> {
return Objects.equals(event.getJobId(), SystemJob.SYSTEM_SCHEDULER_JOB.getId());
}
private void checkJobIsExpired(Job job) {
private void updateJobStatusIfPastEndTime(Job job) {
if (job.isExpired()) {
job.pause();
job.finish();
}
}
private void updateOnceTimeJobStatus(Job job) {
if (job.isStreamingJob()) {
asyncJobManager.putOneJobToQueen(job.getJobId());
return;
}
job.finish();
}
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.exception;
/**
* This class represents a job exception that can be thrown when a job is executed.
*/
public class JobException extends Exception {
public JobException(String message) {
super(message);
}
public JobException(String message, Throwable cause) {
super(message, cause);
}
public JobException(Throwable cause) {
super(cause);
}
}

View File

@ -17,9 +17,16 @@
package org.apache.doris.scheduler.executor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.Job;
/**
* This interface represents a callback for an event registration. All event registrations
* must implement this interface to provide an execution method.
* We will persist JobExecutor in the database, and then execute it when the scheduler starts.
* We use Gson to serialize and deserialize JobExecutor. so the implementation of JobExecutor needs to be serializable.
* You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When you implement JobExecutor,pls make sure
* you can serialize and deserialize it.
*
* @param <T> The result type of the event job execution.
*/
@ -32,6 +39,6 @@ public interface JobExecutor<T> {
*
* @return The result of the event job execution.
*/
T execute();
T execute(Job job) throws JobException;
}

View File

@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.scheduler.exception.JobException;
/**
* A functional interface for executing a job.
* todo
*/
@FunctionalInterface
public interface MemoryTaskExecutor<T> {
/**
* Executes the event job and returns the result.
* Exceptions will be caught internally, so there is no need to define or throw them separately.
*/
void execute() throws JobException;
}

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.executor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.UUID;
/**
* we use this executor to execute sql job
* @param <QueryState> the state of sql job, we can record the state of sql job
*/
@Slf4j
public class SqlJobExecutor<QueryState> implements JobExecutor {
@Getter
@Setter
@SerializedName(value = "sql")
private String sql;
public SqlJobExecutor(String sql) {
this.sql = sql;
}
@Override
public QueryState execute(Job job) throws JobException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName()));
ctx.setDatabase(job.getDbName());
ctx.setQualifiedUser(job.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%"));
ctx.getState().reset();
ctx.setThreadLocalInfo();
String taskIdString = UUID.randomUUID().toString();
UUID taskId = UUID.fromString(taskIdString);
TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
ctx.setQueryId(queryId);
try {
StmtExecutor executor = new StmtExecutor(ctx, sql);
executor.execute(queryId);
log.debug("execute sql job success, sql: {}, state is: {}", sql, ctx.getState());
return (QueryState) ctx.getState();
} catch (Exception e) {
log.warn("execute sql job failed, sql: {}, error: {}", sql, e);
throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage());
}
}
}

View File

@ -1,262 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AsyncJobManager implements Closeable {
private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128);
private long lastBatchSchedulerTimestamp;
/**
* batch scheduler interval time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L;
private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;
private boolean isClosed = false;
/**
* key: jobid
* value: timeout list for one job
* it's used to cancel task, if task has started, it can't be canceled
*/
private final ConcurrentHashMap<Long, Map<Long, Timeout>> jobTimeoutMap =
new ConcurrentHashMap<>(128);
/**
* scheduler tasks, it's used to scheduler job
*/
private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS,
660);
/**
* Producer and Consumer model
* disruptor is used to handle task
* disruptor will start a thread pool to handle task
*/
private final TimerTaskDisruptor disruptor;
public AsyncJobManager() {
dorisTimer.start();
this.disruptor = new TimerTaskDisruptor(this);
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}
public Long registerJob(Job job) {
if (!job.checkJobParam()) {
log.warn("registerJob failed, job: {} param is invalid", job);
return null;
}
if (job.getStartTimestamp() != 0L) {
job.setNextExecuteTimestamp(job.getStartTimestamp() + job.getIntervalMilliSeconds());
} else {
job.setNextExecuteTimestamp(System.currentTimeMillis() + job.getIntervalMilliSeconds());
}
if (job.getNextExecuteTimestamp() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job, System.currentTimeMillis(),
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp,
job.getNextExecuteTimestamp());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
putOneTask(job.getJobId(), timestamp);
}
}
}
jobMap.putIfAbsent(job.getJobId(), job);
return job.getJobId();
}
public void unregisterJob(Long jobId) {
jobMap.remove(jobId);
}
public boolean pauseJob(Long jobId) {
if (jobMap.get(jobId) == null) {
log.warn("pauseJob failed, jobId: {} not exist", jobId);
return false;
}
cancelJobAllTask(jobId);
jobMap.get(jobId).pause();
return true;
}
public boolean resumeJob(Long jobId) {
if (jobMap.get(jobId) == null) {
log.warn("resumeJob failed, jobId: {} not exist", jobId);
return false;
}
jobMap.get(jobId).resume();
return true;
}
public boolean stopJob(Long jobId) {
if (jobMap.get(jobId) == null) {
log.warn("stopJob failed, jobId: {} not exist", jobId);
return false;
}
cancelJobAllTask(jobId);
jobMap.get(jobId).stop();
return true;
}
public Job getJob(Long jobId) {
return jobMap.get(jobId);
}
public Map<Long, Job> getAllJob() {
return jobMap;
}
public boolean batchSchedulerTasks() {
executeJobIdsWithinLastTenMinutesWindow();
return true;
}
public List<Long> findTasksBetweenTime(Job job, Long startTime, Long endTime, Long nextExecuteTime) {
List<Long> jobExecuteTimes = new ArrayList<>();
if (System.currentTimeMillis() < startTime) {
return jobExecuteTimes;
}
while (endTime >= nextExecuteTime) {
if (job.isTaskTimeExceeded()) {
break;
}
jobExecuteTimes.add(nextExecuteTime);
nextExecuteTime = job.getExecuteTimestampAndGeneratorNext();
}
return jobExecuteTimes;
}
/**
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeJobIdsWithinLastTenMinutesWindow() {
if (jobMap.isEmpty()) {
return;
}
jobMap.forEach((k, v) -> {
if (v.isRunning() && (v.getNextExecuteTimestamp() + v.getIntervalMilliSeconds()
< lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
List<Long> executeTimes = findTasksBetweenTime(v, lastBatchSchedulerTimestamp,
lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
v.getNextExecuteTimestamp());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
putOneTask(v.getJobId(), executeTime);
}
}
}
});
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
}
/**
* We will cycle system scheduler tasks every 10 minutes.
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
dorisTimer.newTimeout(timeout -> {
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
}
public void putOneTask(Long jobId, Long startExecuteTime) {
DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, disruptor);
if (isClosed) {
log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId());
return;
}
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
if (timeout == null) {
log.error("putOneTask failed, jobId: {}", task.getJobId());
return;
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
}
// cancel all task for one job
// if task has started, it can't be canceled
public void cancelJobAllTask(Long jobId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
jobTimeoutMap.get(jobId).values().forEach(timeout -> {
if (!timeout.isExpired() || timeout.isCancelled()) {
timeout.cancel();
}
});
}
public void stopTask(Long jobId, Long taskId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
cancelJobAllTask(jobId);
jobTimeoutMap.get(jobId).remove(taskId);
}
// get delay time, if startTimestamp is less than now, return 0
private long getDelaySecond(long startTimestamp) {
long delay = 0;
long now = System.currentTimeMillis();
if (startTimestamp > now) {
delay = startTimestamp - now;
} else {
log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now);
}
return delay / 1000;
}
@Override
public void close() throws IOException {
isClosed = true;
dorisTimer.stop();
disruptor.close();
}
}

View File

@ -53,6 +53,6 @@ public class DorisTimerTask implements TimerTask {
if (timeout.isCancelled()) {
return;
}
timerTaskDisruptor.tryPublish(jobId, taskId);
timerTaskDisruptor.tryPublish(jobId);
}
}

View File

@ -17,12 +17,25 @@
package org.apache.doris.scheduler.job;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.constants.JobType;
import org.apache.doris.scheduler.executor.JobExecutor;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.util.UUID;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
/**
* Job is the core of the scheduler module, which is used to store the Job information of the job module.
@ -32,53 +45,90 @@ import java.util.UUID;
* job.
*/
@Data
public class Job {
public class Job implements Writable {
public Job(String jobName, Long intervalMilliSeconds, Long startTimestamp, Long endTimestamp,
public Job(String jobName, Long intervalMilliSeconds, Long startTimeMs, Long endTimeMs,
JobExecutor executor) {
this.jobName = jobName;
this.executor = executor;
this.intervalMilliSeconds = intervalMilliSeconds;
this.startTimestamp = null == startTimestamp ? 0L : startTimestamp;
this.endTimestamp = null == endTimestamp ? 0L : endTimestamp;
this.intervalMs = intervalMilliSeconds;
this.startTimeMs = null == startTimeMs ? 0L : startTimeMs;
this.endTimeMs = null == endTimeMs ? 0L : endTimeMs;
this.jobStatus = JobStatus.RUNNING;
this.jobId = Env.getCurrentEnv().getNextId();
}
private Long jobId = UUID.randomUUID().getMostSignificantBits();
public Job() {
this.jobId = Env.getCurrentEnv().getNextId();
}
@SerializedName("jobId")
private Long jobId;
@SerializedName("jobName")
private String jobName;
@SerializedName("dbName")
private String dbName;
/**
* The status of the job, which is used to control the execution of the job.
*
* @see JobStatus
*/
private JobStatus jobStatus = JobStatus.RUNNING;
@SerializedName("jobStatus")
private JobStatus jobStatus;
/**
* The executor of the job.
*
* @see JobExecutor
*/
@SerializedName("executor")
private JobExecutor executor;
@SerializedName("user")
private String user;
@SerializedName("isCycleJob")
private boolean isCycleJob = false;
@SerializedName("isStreamingJob")
private boolean isStreamingJob = false;
@SerializedName("intervalMs")
private Long intervalMs = 0L;
@SerializedName("startTimeMs")
private Long startTimeMs = 0L;
@SerializedName("endTimeMs")
private Long endTimeMs = 0L;
@SerializedName("timezone")
private String timezone;
@SerializedName("jobCategory")
private JobCategory jobCategory;
@SerializedName("latestStartExecuteTimeMs")
private Long latestStartExecuteTimeMs = 0L;
@SerializedName("latestCompleteExecuteTimeMs")
private Long latestCompleteExecuteTimeMs = 0L;
@SerializedName("intervalUnit")
private IntervalUnit intervalUnit;
@SerializedName("originInterval")
private Long originInterval;
@SerializedName("nextExecuteTimeMs")
private Long nextExecuteTimeMs = 0L;
@SerializedName("comment")
private String comment;
@SerializedName("errMsg")
private String errMsg;
private Long intervalMilliSeconds;
private Long updateTime;
private Long nextExecuteTimestamp;
private Long startTimestamp = 0L;
private Long endTimestamp = 0L;
private Long firstExecuteTimestamp = 0L;
private Long latestStartExecuteTimestamp = 0L;
private Long latestCompleteExecuteTimestamp = 0L;
public boolean isRunning() {
return jobStatus == JobStatus.RUNNING;
}
@ -88,32 +138,32 @@ public class Job {
}
public boolean isExpired(long nextExecuteTimestamp) {
if (endTimestamp == 0L) {
if (endTimeMs == 0L) {
return false;
}
return nextExecuteTimestamp > endTimestamp;
return nextExecuteTimestamp > endTimeMs;
}
public boolean isTaskTimeExceeded() {
if (endTimestamp == 0L) {
if (endTimeMs == 0L) {
return false;
}
return System.currentTimeMillis() >= endTimestamp || nextExecuteTimestamp > endTimestamp;
return System.currentTimeMillis() >= endTimeMs || nextExecuteTimeMs > endTimeMs;
}
public boolean isExpired() {
if (endTimestamp == 0L) {
if (endTimeMs == 0L) {
return false;
}
return System.currentTimeMillis() >= endTimestamp;
return System.currentTimeMillis() >= endTimeMs;
}
public Long getExecuteTimestampAndGeneratorNext() {
this.latestStartExecuteTimestamp = nextExecuteTimestamp;
// todo The problem of delay should be considered. If it is greater than the ten-minute time window,
this.latestStartExecuteTimeMs = nextExecuteTimeMs;
// todo The problem of delay should be considered. If it is greater than the ten-minute time window,
// should the task be lost or executed on a new time window?
this.nextExecuteTimestamp = latestStartExecuteTimestamp + intervalMilliSeconds;
return nextExecuteTimestamp;
this.nextExecuteTimeMs = latestStartExecuteTimeMs + intervalMs;
return nextExecuteTimeMs;
}
public void pause() {
@ -125,6 +175,10 @@ public class Job {
this.errMsg = errMsg;
}
public void finish() {
this.jobStatus = JobStatus.FINISHED;
}
public void resume() {
this.jobStatus = JobStatus.RUNNING;
}
@ -134,15 +188,59 @@ public class Job {
}
public boolean checkJobParam() {
if (startTimestamp != 0L && startTimestamp < System.currentTimeMillis()) {
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
return false;
}
if (endTimestamp != 0L && endTimestamp < System.currentTimeMillis()) {
if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
return false;
}
if (intervalMilliSeconds == null || intervalMilliSeconds <= 0L) {
if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
return false;
}
if (null == jobCategory) {
return false;
}
return null != executor;
}
@Override
public void write(DataOutput out) throws IOException {
String jobData = GsonUtils.GSON.toJson(this);
Text.writeString(out, jobData);
}
public static Job readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), Job.class);
}
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(dbName);
row.add(jobName);
row.add(user);
row.add(timezone);
if (isCycleJob) {
row.add(JobType.RECURRING.name());
} else {
if (isStreamingJob) {
row.add(JobType.STREAMING.name());
} else {
row.add(JobType.ONE_TIME.name());
}
}
row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs));
row.add(isCycleJob ? originInterval.toString() : "null");
row.add(isCycleJob ? intervalUnit.name() : "null");
row.add(isCycleJob && startTimeMs > 0 ? TimeUtils.longToTimeString(startTimeMs) : "null");
row.add(isCycleJob && endTimeMs > 0 ? TimeUtils.longToTimeString(endTimeMs) : "null");
row.add(jobStatus.name());
row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs));
row.add(errMsg == null ? "null" : errMsg);
row.add(comment == null ? "null" : comment);
return row;
}
}

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.job;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
@Data
public class JobTask implements Writable {
@SerializedName("jobId")
private Long jobId;
@SerializedName("taskId")
private Long taskId;
@SerializedName("startTimeMs")
private Long startTimeMs;
@SerializedName("endTimeMs")
private Long endTimeMs;
@SerializedName("successful")
private Boolean isSuccessful;
@SerializedName("errorMsg")
private String errorMsg;
public JobTask(Long jobId) {
//it's enough to use nanoTime to identify a task
this.taskId = System.nanoTime();
this.jobId = jobId;
}
public List<String> getShowInfo() {
List<String> row = Lists.newArrayList();
row.add(String.valueOf(jobId));
row.add(String.valueOf(taskId));
row.add(TimeUtils.longToTimeString(startTimeMs));
row.add(null == endTimeMs ? "null" : TimeUtils.longToTimeString(endTimeMs));
if (endTimeMs == null) {
row.add("RUNNING");
} else {
row.add(isSuccessful ? "SUCCESS" : "FAILED");
}
if (null == errorMsg) {
row.add("null");
} else {
row.add(errorMsg);
}
return row;
}
@Override
public void write(DataOutput out) throws IOException {
String jobData = GsonUtils.GSON.toJson(this);
Text.writeString(out, jobData);
}
public static JobTask readFields(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), JobTask.class);
}
}

View File

@ -0,0 +1,481 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.constants.JobStatus;
import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor;
import org.apache.doris.scheduler.job.DorisTimerTask;
import org.apache.doris.scheduler.job.Job;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Slf4j
public class AsyncJobManager implements Closeable, Writable {
private final ConcurrentHashMap<Long, Job> jobMap = new ConcurrentHashMap<>(128);
private long lastBatchSchedulerTimestamp;
/**
* batch scheduler interval time
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L;
private boolean isClosed = false;
/**
* key: jobid
* value: timeout list for one job
* it's used to cancel task, if task has started, it can't be canceled
*/
private final ConcurrentHashMap<Long, Map<Long, Timeout>> jobTimeoutMap = new ConcurrentHashMap<>(128);
/**
* scheduler tasks, it's used to scheduler job
*/
private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
/**
* Producer and Consumer model
* disruptor is used to handle task
* disruptor will start a thread pool to handle task
*/
private final TimerTaskDisruptor disruptor;
public AsyncJobManager() {
dorisTimer.start();
this.disruptor = new TimerTaskDisruptor(this);
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}
public Long registerJob(Job job) throws DdlException {
if (!job.checkJobParam()) {
throw new DdlException("Job param is invalid, please check time param");
}
checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory());
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
Env.getCurrentEnv().getEditLog().logCreateJob(job);
return job.getJobId();
}
public void replayCreateJob(Job job) {
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay create scheduler job").build());
}
/**
* Replay update load job.
**/
public void replayUpdateJob(Job job) {
jobMap.put(job.getJobId(), job);
if (JobStatus.RUNNING.equals(job.getJobStatus())) {
initAndSchedulerJob(job);
}
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay update scheduler job").build());
}
public void replayDeleteJob(Job job) {
if (null == jobMap.get(job.getJobId())) {
return;
}
jobMap.remove(job.getJobId());
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
.add("msg", "replay delete scheduler job").build());
}
private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory))
.filter(job -> job.getDbName().equals(dbName))
.filter(job -> job.getJobName().equals(jobName)).findFirst();
if (optionalJob.isPresent()) {
throw new DdlException("Name " + jobName + " already used in db " + dbName);
}
}
private void initAndSchedulerJob(Job job) {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
return;
}
Long currentTimeMs = System.currentTimeMillis();
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
if (job.getNextExecuteTimeMs() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp,
job.getNextExecuteTimeMs());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
putOneTask(job.getJobId(), timestamp);
}
}
}
}
private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) {
if (startTimeMs != 0L && startTimeMs > currentTimeMs) {
return startTimeMs;
}
// if it's not cycle job and already delay, next execute time is current time
if (!isCycleJob) {
return currentTimeMs;
}
long cycle = (currentTimeMs - startTimeMs) / intervalMs;
if ((currentTimeMs - startTimeMs) % intervalMs > 0) {
cycle += 1;
}
return startTimeMs + cycle * intervalMs;
}
public void unregisterJob(Long jobId) {
jobMap.remove(jobId);
}
public void pauseJob(Long jobId) {
Job job = jobMap.get(jobId);
if (jobMap.get(jobId) == null) {
log.warn("pauseJob failed, jobId: {} not exist", jobId);
}
if (jobMap.get(jobId).getJobStatus().equals(JobStatus.PAUSED)) {
log.warn("pauseJob failed, jobId: {} is already paused", jobId);
}
pauseJob(job);
}
public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (job.getJobStatus().equals(JobStatus.STOPPED)) {
throw new DdlException("Job " + jobName + " is already stopped");
}
stopJob(optionalJob.get());
Env.getCurrentEnv().getEditLog().logDeleteJob(optionalJob.get());
}
private void stopJob(Job job) {
if (JobStatus.RUNNING.equals(job.getJobStatus())) {
cancelJobAllTask(job.getJobId());
}
job.setJobStatus(JobStatus.STOPPED);
jobMap.get(job.getJobId()).stop();
Env.getCurrentEnv().getEditLog().logDeleteJob(job);
Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId());
}
public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (!job.getJobStatus().equals(JobStatus.PAUSED)) {
throw new DdlException("Job " + jobName + " is not paused");
}
resumeJob(job);
}
private void resumeJob(Job job) {
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.RUNNING);
jobMap.get(job.getJobId()).resume();
initAndSchedulerJob(job);
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException {
Optional<Job> optionalJob = findJob(dbName, jobName, jobCategory);
if (!optionalJob.isPresent()) {
throw new DdlException("Job " + jobName + " not exist in db " + dbName);
}
Job job = optionalJob.get();
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
throw new DdlException("Job " + jobName + " is not running");
}
pauseJob(job);
}
private void pauseJob(Job job) {
cancelJobAllTask(job.getJobId());
job.setJobStatus(JobStatus.PAUSED);
jobMap.get(job.getJobId()).pause();
Env.getCurrentEnv().getEditLog().logUpdateJob(job);
}
private Optional<Job> findJob(String dbName, String jobName, JobCategory jobCategory) {
return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst();
}
private boolean checkJobMatch(Job job, String dbName, String jobName, JobCategory jobCategory) {
return job.getDbName().equals(dbName) && job.getJobName().equals(jobName)
&& job.getJobCategory().equals(jobCategory);
}
public void resumeJob(Long jobId) {
if (jobMap.get(jobId) == null) {
log.warn("resumeJob failed, jobId: {} not exist", jobId);
return;
}
Job job = jobMap.get(jobId);
resumeJob(job);
}
public void stopJob(Long jobId) {
Job job = jobMap.get(jobId);
if (null == job) {
log.warn("stopJob failed, jobId: {} not exist", jobId);
return;
}
if (job.getJobStatus().equals(JobStatus.STOPPED)) {
log.warn("stopJob failed, jobId: {} is already stopped", jobId);
return;
}
stopJob(job);
}
public Job getJob(Long jobId) {
return jobMap.get(jobId);
}
public Map<Long, Job> getAllJob() {
return jobMap;
}
public void batchSchedulerTasks() {
executeJobIdsWithinLastTenMinutesWindow();
}
private List<Long> findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
if (job.isStreamingJob()) {
job.setJobStatus(JobStatus.RUNNING);
} else {
job.setJobStatus(JobStatus.WAITING_FINISH);
}
return jobExecuteTimes;
}
while (endTimeEndWindow >= nextExecuteTime) {
if (job.isTaskTimeExceeded()) {
break;
}
jobExecuteTimes.add(nextExecuteTime);
nextExecuteTime = job.getExecuteTimestampAndGeneratorNext();
}
return jobExecuteTimes;
}
/**
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeJobIdsWithinLastTenMinutesWindow() {
if (jobMap.isEmpty()) {
return;
}
jobMap.forEach((k, v) -> {
if (v.isRunning() && (v.getNextExecuteTimeMs()
+ v.getIntervalMs() < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
List<Long> executeTimes = findTasksBetweenTime(
v, lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
v.getNextExecuteTimeMs());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
putOneTask(v.getJobId(), executeTime);
}
}
}
});
this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
}
/**
* We will cycle system scheduler tasks every 10 minutes.
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
dorisTimer.newTimeout(timeout -> {
batchSchedulerTasks();
cycleSystemSchedulerTasks();
}, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
}
/**
* put one task to time wheel,it's well be trigger after delay milliseconds
* if the scheduler is closed, the task will not be put into the time wheel
* if delay is less than 0, the task will be trigger immediately
*
* @param jobId job id, we will use it to find the job
* @param startExecuteTime the task will be trigger in this time, unit is millisecond,and we will convert it to
* delay seconds, we just can be second precision
*/
public void putOneTask(Long jobId, Long startExecuteTime) {
DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, disruptor);
if (isClosed) {
log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId());
return;
}
long delay = getDelaySecond(task.getStartTimestamp());
Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS);
if (timeout == null) {
log.error("putOneTask failed, jobId: {}", task.getJobId());
return;
}
if (jobTimeoutMap.containsKey(task.getJobId())) {
jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout);
return;
}
Map<Long, Timeout> timeoutMap = new ConcurrentHashMap<>();
timeoutMap.put(task.getTaskId(), timeout);
jobTimeoutMap.put(task.getJobId(), timeoutMap);
}
// cancel all task for one job
// if task has started, it can't be canceled
public void cancelJobAllTask(Long jobId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
jobTimeoutMap.get(jobId).values().forEach(timeout -> {
if (!timeout.isExpired() || timeout.isCancelled()) {
timeout.cancel();
}
});
}
public void stopTask(Long jobId, Long taskId) {
if (!jobTimeoutMap.containsKey(jobId)) {
return;
}
cancelJobAllTask(jobId);
jobTimeoutMap.get(jobId).remove(taskId);
}
// get delay time, if startTimestamp is less than now, return 0
private long getDelaySecond(long startTimestamp) {
long delay = 0;
long now = System.currentTimeMillis();
if (startTimestamp > now) {
delay = startTimestamp - now;
} else {
//if execute time is less than now, return 0,immediately execute
log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now);
return delay;
}
return delay / 1000;
}
@Override
public void close() throws IOException {
isClosed = true;
dorisTimer.stop();
disruptor.close();
}
/**
* sort by job id
*
* @param dbFullName database name
* @param category job category
* @param matcher job name matcher
*/
public List<Job> queryJob(String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) {
List<Job> jobs = new ArrayList<>();
jobMap.values().forEach(job -> {
if (matchJob(job, dbFullName, jobName, category, matcher)) {
jobs.add(job);
}
});
return jobs;
}
private boolean matchJob(Job job, String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) {
if (StringUtils.isNotBlank(dbFullName) && !job.getDbName().equalsIgnoreCase(dbFullName)) {
return false;
}
if (StringUtils.isNotBlank(jobName) && !job.getJobName().equalsIgnoreCase(jobName)) {
return false;
}
if (category != null && !job.getJobCategory().equals(category)) {
return false;
}
return null == matcher || matcher.match(job.getJobName());
}
public void putOneJobToQueen(Long jobId) {
disruptor.tryPublish(jobId);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobMap.size());
for (Job job : jobMap.values()) {
job.write(out);
}
}
/**
* read job from data input, and init job
*
* @param in data input
* @throws IOException io exception when read data input error
*/
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Job job = Job.readFields(in);
jobMap.put(job.getJobId(), job);
initAndSchedulerJob(job);
}
}
}

View File

@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.manager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.scheduler.job.JobTask;
import lombok.extern.slf4j.Slf4j;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class JobTaskManager implements Writable {
private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_num;
private ConcurrentHashMap<Long, ConcurrentLinkedQueue<JobTask>> jobTaskMap = new ConcurrentHashMap<>(16);
public void addJobTask(JobTask jobTask) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>());
jobTasks.add(jobTask);
if (jobTasks.size() > TASK_MAX_NUM) {
JobTask oldTask = jobTasks.poll();
Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask);
}
Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask);
}
public List<JobTask> getJobTasks(Long jobId) {
if (jobTaskMap.containsKey(jobId)) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(jobId);
List<JobTask> jobTaskList = new LinkedList<>(jobTasks);
Collections.reverse(jobTaskList);
return jobTaskList;
}
return new ArrayList<>();
}
public void replayCreateTask(JobTask task) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap
.computeIfAbsent(task.getJobId(), k -> new ConcurrentLinkedQueue<>());
jobTasks.add(task);
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId())
.add("msg", "replay create scheduler task").build());
}
public void replayDeleteTask(JobTask task) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(task.getJobId());
if (jobTasks != null) {
jobTasks.remove(task);
}
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId())
.add("msg", "replay delete scheduler task").build());
}
public void deleteJobTasks(Long jobId) {
ConcurrentLinkedQueue<JobTask> jobTasks = jobTaskMap.get(jobId);
if (jobTasks != null) {
JobTask jobTask = jobTasks.poll();
log.info(new LogBuilder(LogKey.SCHEDULER_TASK, jobTask.getTaskId())
.add("msg", "replay delete scheduler task").build());
}
jobTaskMap.remove(jobId);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(jobTaskMap.size());
for (Map.Entry<Long, ConcurrentLinkedQueue<JobTask>> entry : jobTaskMap.entrySet()) {
out.writeLong(entry.getKey());
out.writeInt(entry.getValue().size());
for (JobTask jobTask : entry.getValue()) {
jobTask.write(out);
}
}
}
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Long jobId = in.readLong();
int taskSize = in.readInt();
ConcurrentLinkedQueue<JobTask> jobTasks = new ConcurrentLinkedQueue<>();
for (int j = 0; j < taskSize; j++) {
JobTask jobTask = JobTask.readFields(in);
jobTasks.add(jobTask);
}
jobTaskMap.put(jobId, jobTasks);
}
}
}

View File

@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.registry;
import org.apache.doris.scheduler.executor.MemoryTaskExecutor;
/**
* todo
* Support in-memory job registration in the future
*/
public interface MemoryTaskRegister {
Long registerTask(MemoryTaskExecutor executor);
void cancelTask(Long taskId);
}

View File

@ -17,26 +17,31 @@
package org.apache.doris.scheduler.registry;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.Job;
import java.io.IOException;
import java.util.List;
/**
* This interface provides a contract for registering timed scheduling events.
* The implementation should trigger events in a timely manner using a specific algorithm.
* The execution of the events may be asynchronous and not guarantee strict timing accuracy.
*/
public interface JobRegister {
public interface PersistentJobRegister {
/**
* Register a job
*
* @param name job name,it's not unique
* @param intervalMs job interval, unit: ms
* @param executor job executor @See {@link JobExecutor}
* @param name job name,it's not unique
* @param intervalMs job interval, unit: ms
* @param executor job executor @See {@link JobExecutor}
* @return event job id
*/
Long registerJob(String name, Long intervalMs, JobExecutor executor);
Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException;
/**
* Register a job
@ -49,7 +54,7 @@ public interface JobRegister {
* @param executor event job executor @See {@link JobExecutor}
* @return job id
*/
Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor);
Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) throws DdlException;
/**
@ -68,18 +73,21 @@ public interface JobRegister {
* @return event job id
*/
Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp,
JobExecutor executor);
JobExecutor executor) throws DdlException;
/**
* if job is running, pause it
* pause means event job will not be executed in the next cycle,but current cycle will not be interrupted
* we can resume it by {@link #resumeJob(Long)}
*
* @param eventId event job id
* if eventId not exist, return false
* @return true if pause success, false if pause failed
* @param jodId job id
* if jobId not exist, return false
*/
Boolean pauseJob(Long jodId);
void pauseJob(Long jodId);
void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
/**
* if job is running, stop it
@ -88,17 +96,21 @@ public interface JobRegister {
* we will delete stopped event job
*
* @param jobId event job id
* @return true if stop success, false if stop failed
*/
Boolean stopJob(Long jobId);
void stopJob(Long jobId);
void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException;
/**
* if job is paused, resume it
*
* @param jobId job id
* @return true if resume success, false if resume failed
*/
Boolean resumeJob(Long jobId);
void resumeJob(Long jobId);
Long registerJob(Job job) throws DdlException;
List<Job> getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher);
/**
* close job scheduler register

View File

@ -109,6 +109,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("auto_increment", new Integer(SqlParserSymbols.KW_AUTO_INCREMENT));
keywordMap.put("as", new Integer(SqlParserSymbols.KW_AS));
keywordMap.put("asc", new Integer(SqlParserSymbols.KW_ASC));
keywordMap.put("at", new Integer(SqlParserSymbols.KW_AT));
keywordMap.put("authors", new Integer(SqlParserSymbols.KW_AUTHORS));
keywordMap.put("backend", new Integer(SqlParserSymbols.KW_BACKEND));
keywordMap.put("backends", new Integer(SqlParserSymbols.KW_BACKENDS));
@ -196,6 +197,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED));
keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION));
keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
keywordMap.put("do", new Integer(SqlParserSymbols.KW_DO));
keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
keywordMap.put("drop", new Integer(SqlParserSymbols.KW_DROP));
keywordMap.put("dropp", new Integer(SqlParserSymbols.KW_DROPP));
@ -206,11 +208,13 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("encryptkey", new Integer(SqlParserSymbols.KW_ENCRYPTKEY));
keywordMap.put("encryptkeys", new Integer(SqlParserSymbols.KW_ENCRYPTKEYS));
keywordMap.put("end", new Integer(SqlParserSymbols.KW_END));
keywordMap.put("ends", new Integer(SqlParserSymbols.KW_ENDS));
keywordMap.put("engine", new Integer(SqlParserSymbols.KW_ENGINE));
keywordMap.put("engines", new Integer(SqlParserSymbols.KW_ENGINES));
keywordMap.put("enter", new Integer(SqlParserSymbols.KW_ENTER));
keywordMap.put("errors", new Integer(SqlParserSymbols.KW_ERRORS));
keywordMap.put("events", new Integer(SqlParserSymbols.KW_EVENTS));
keywordMap.put("every", new Integer(SqlParserSymbols.KW_EVERY));
keywordMap.put("except", new Integer(SqlParserSymbols.KW_EXCEPT));
keywordMap.put("exclude", new Integer(SqlParserSymbols.KW_EXCLUDE));
keywordMap.put("exists", new Integer(SqlParserSymbols.KW_EXISTS));
@ -279,6 +283,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("isnull", new Integer(SqlParserSymbols.KW_ISNULL));
keywordMap.put("isolation", new Integer(SqlParserSymbols.KW_ISOLATION));
keywordMap.put("job", new Integer(SqlParserSymbols.KW_JOB));
keywordMap.put("jobs", new Integer(SqlParserSymbols.KW_JOBS));
keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN));
keywordMap.put("json", new Integer(SqlParserSymbols.KW_JSON));
keywordMap.put("jsonb", new Integer(SqlParserSymbols.KW_JSONB));
@ -410,6 +415,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("rows", new Integer(SqlParserSymbols.KW_ROWS));
keywordMap.put("s3", new Integer(SqlParserSymbols.KW_S3));
keywordMap.put("schema", new Integer(SqlParserSymbols.KW_SCHEMA));
keywordMap.put("scheduler", new Integer(SqlParserSymbols.KW_SCHEDULER));
keywordMap.put("schemas", new Integer(SqlParserSymbols.KW_SCHEMAS));
keywordMap.put("second", new Integer(SqlParserSymbols.KW_SECOND));
keywordMap.put("select", new Integer(SqlParserSymbols.KW_SELECT));
@ -428,11 +434,13 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE));
keywordMap.put("sample", new Integer(SqlParserSymbols.KW_SAMPLE));
keywordMap.put("start", new Integer(SqlParserSymbols.KW_START));
keywordMap.put("starts", new Integer(SqlParserSymbols.KW_STARTS));
keywordMap.put("stats", new Integer(SqlParserSymbols.KW_STATS));
keywordMap.put("status", new Integer(SqlParserSymbols.KW_STATUS));
keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP));
keywordMap.put("storage", new Integer(SqlParserSymbols.KW_STORAGE));
keywordMap.put("stream", new Integer(SqlParserSymbols.KW_STREAM));
keywordMap.put("streaming", new Integer(SqlParserSymbols.KW_STREAMING));
keywordMap.put("string", new Integer(SqlParserSymbols.KW_STRING));
keywordMap.put("struct", new Integer(SqlParserSymbols.KW_STRUCT));
keywordMap.put("sum", new Integer(SqlParserSymbols.KW_SUM));
@ -446,6 +454,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET));
keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS));
keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK));
keywordMap.put("tasks", new Integer(SqlParserSymbols.KW_TASKS));
keywordMap.put("temporary", new Integer(SqlParserSymbols.KW_TEMPORARY));
keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED));
keywordMap.put("text", new Integer(SqlParserSymbols.KW_TEXT));

View File

@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.SqlParserUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.StringReader;
public class CreateJobStmtTest {
@Test
public void createOnceTimeJobStmt() throws Exception {
String sql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO SELECT * FROM `address` ;";
CreateJobStmt jobStmt = sqlParse(sql);
System.out.println(jobStmt.getStmt().toSql());
Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql());
String badExecuteSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO selects * from address ;";
Assertions.assertThrows(AnalysisException.class, () -> {
sqlParse(badExecuteSql);
});
String badSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" STARTS \"2023-02-15\" DO selects * from address ;";
Assertions.assertThrows(AnalysisException.class, () -> {
sqlParse(badSql);
});
}
private CreateJobStmt sqlParse(String sql) throws Exception {
org.apache.doris.analysis.SqlScanner input = new org.apache.doris.analysis.SqlScanner(new StringReader(sql));
org.apache.doris.analysis.SqlParser parser = new org.apache.doris.analysis.SqlParser(input);
CreateJobStmt jobStmt = (CreateJobStmt) SqlParserUtils.getStmt(parser, 0);
return jobStmt;
}
@Test
public void createCycleJob() throws Exception {
String sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND STARTS \"2023-02-15\" DO SELECT * FROM `address` ;";
CreateJobStmt jobStmt = sqlParse(sql);
Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql());
sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND ENDS \"2023-02-15\" DO SELECT * FROM `address` ;";
jobStmt = sqlParse(sql);
Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql());
sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND STARTS \"2023-02-15\" ENDS \"2023-02-16\" DO SELECT * FROM `address` ;";
jobStmt = sqlParse(sql);
Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql());
sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND DO SELECT * FROM `address` ;";
jobStmt = sqlParse(sql);
Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql());
String badExecuteSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO selects * from address ;";
Assertions.assertThrows(AnalysisException.class, () -> {
sqlParse(badExecuteSql);
});
String badSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" STARTS \"2023-02-15\" DO selects * from address ;";
Assertions.assertThrows(AnalysisException.class, () -> {
sqlParse(badSql);
});
}
}

View File

@ -17,11 +17,17 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.persist.EditLog;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.AsyncJobManager;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import lombok.extern.slf4j.Slf4j;
import mockit.Expectations;
import mockit.Mocked;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@ -37,25 +43,42 @@ public class AsyncJobManagerTest {
AsyncJobManager asyncJobManager;
@Mocked
EditLog editLog;
private static AtomicInteger testExecuteCount = new AtomicInteger(0);
Job job = new Job("test", 6000L, null,
null, new TestExecutor());
@BeforeEach
public void init() {
job.setCycleJob(true);
job.setJobCategory(JobCategory.COMMON);
testExecuteCount.set(0);
asyncJobManager = new AsyncJobManager();
}
@Test
public void testCycleScheduler() {
public void testCycleScheduler(@Mocked Env env) throws DdlException {
setContext(env);
asyncJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 3);
}
private void setContext(Env env) {
new Expectations() {
{
env.getEditLog();
result = editLog;
editLog.logCreateJob((Job) any);
}
};
}
@Test
public void testCycleSchedulerAndStop() {
public void testCycleSchedulerAndStop(@Mocked Env env) throws DdlException {
setContext(env);
asyncJobManager.registerJob(job);
long startTime = System.currentTimeMillis();
Awaitility.await().atMost(8, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 1);
@ -63,27 +86,29 @@ public class AsyncJobManagerTest {
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() >= startTime + 13000L);
Assertions.assertEquals(1, testExecuteCount.get());
}
@Test
public void testCycleSchedulerWithIncludeStartTimeAndEndTime() {
job.setStartTimestamp(System.currentTimeMillis() + 6000L);
public void testCycleSchedulerWithIncludeStartTimeAndEndTime(@Mocked Env env) throws DdlException {
setContext(env);
job.setStartTimeMs(System.currentTimeMillis() + 6000L);
long endTimestamp = System.currentTimeMillis() + 19000L;
job.setEndTimestamp(endTimestamp);
job.setEndTimeMs(endTimestamp);
asyncJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
>= endTimestamp + 12000L);
Assertions.assertEquals(2, testExecuteCount.get());
Assertions.assertEquals(3, testExecuteCount.get());
}
@Test
public void testCycleSchedulerWithIncludeEndTime() {
public void testCycleSchedulerWithIncludeEndTime(@Mocked Env env) throws DdlException {
setContext(env);
long endTimestamp = System.currentTimeMillis() + 13000;
job.setEndTimestamp(endTimestamp);
job.setEndTimeMs(endTimestamp);
asyncJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(36, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
>= endTimestamp + 12000L);
@ -91,10 +116,26 @@ public class AsyncJobManagerTest {
}
@Test
public void testCycleSchedulerWithIncludeStartTime() {
public void testCycleSchedulerWithIncludeStartTime(@Mocked Env env) throws DdlException {
setContext(env);
long startTimestamp = System.currentTimeMillis() + 6000L;
job.setStartTimestamp(startTimestamp);
job.setStartTimeMs(startTimestamp);
asyncJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
>= startTimestamp + 7000L);
Assertions.assertEquals(2, testExecuteCount.get());
}
@Test
public void testOneTimeJob(@Mocked Env env) throws DdlException {
setContext(env);
long startTimestamp = System.currentTimeMillis() + 3000L;
job.setIntervalMs(0L);
job.setStartTimeMs(startTimestamp);
job.setCycleJob(false);
asyncJobManager.registerJob(job);
//consider the time of the first execution and give some buffer time
Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis()
@ -109,7 +150,7 @@ public class AsyncJobManagerTest {
class TestExecutor implements JobExecutor<Boolean> {
@Override
public Boolean execute() {
public Boolean execute(Job job) {
log.info("test execute count:{}", testExecuteCount.incrementAndGet());
return true;
}

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.scheduler.common.IntervalUnit;
import org.apache.doris.scheduler.constants.JobCategory;
import org.apache.doris.scheduler.executor.SqlJobExecutor;
import org.apache.doris.scheduler.job.Job;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class JobTest {
private static Job job;
@BeforeAll
public static void init() {
SqlJobExecutor sqlJobExecutor = new SqlJobExecutor("insert into test values(1);");
job = new Job("insertTest", 1000L, System.currentTimeMillis(), System.currentTimeMillis() + 100000, sqlJobExecutor);
job.setCycleJob(true);
job.setComment("test");
job.setOriginInterval(10L);
job.setIntervalUnit(IntervalUnit.SECOND);
job.setUser("root");
job.setDbName("test");
job.setTimezone("Asia/Shanghai");
job.setJobCategory(JobCategory.SQL);
}
@Test
public void testSerialization() throws IOException {
Path path = Paths.get("./scheduler-jobs");
Files.deleteIfExists(path);
Files.createFile(path);
DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path));
job.write(dos);
dos.flush();
dos.close();
DataInputStream dis = new DataInputStream(Files.newInputStream(path));
Job readJob = Job.readFields(dis);
Assertions.assertEquals(job.getJobName(), readJob.getJobName());
Assertions.assertEquals(job.getTimezone(), readJob.getTimezone());
}
@AfterAll
public static void clean() throws IOException {
Path path = Paths.get("./scheduler-jobs");
Files.deleteIfExists(path);
}
}

View File

@ -18,8 +18,8 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.scheduler.executor.JobExecutor;
import org.apache.doris.scheduler.job.AsyncJobManager;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.manager.AsyncJobManager;
import mockit.Expectations;
import mockit.Injectable;
@ -30,7 +30,6 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class TimerTaskDisruptorTest {
@ -56,7 +55,7 @@ public class TimerTaskDisruptorTest {
asyncJobManager.getJob(anyLong);
result = job;
}};
timerTaskDisruptor.tryPublish(job.getJobId(), UUID.randomUUID().getMostSignificantBits());
timerTaskDisruptor.tryPublish(job.getJobId());
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag);
Assertions.assertTrue(testEventExecuteFlag);
}
@ -64,7 +63,7 @@ public class TimerTaskDisruptorTest {
class TestExecutor implements JobExecutor<Boolean> {
@Override
public Boolean execute() {
public Boolean execute(Job job) {
testEventExecuteFlag = true;
return true;
}