diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 1b4c61a866..730aa08680 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1520,6 +1520,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_pipeline_load = false; + @ConfField + public static int scheduler_job_task_max_num = 10; + // enable_workload_group should be immutable and temporarily set to mutable during the development test phase @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL) public static boolean enable_workload_group = false; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 971d45d1f0..3c7803f598 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -260,6 +260,7 @@ terminal String KW_AUTO_INCREMENT, KW_AS, KW_ASC, + KW_AT, KW_AUTHORS, KW_BACKEND, KW_BACKENDS, @@ -342,6 +343,7 @@ terminal String KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DIV, + KW_DO, KW_DOUBLE, KW_DROP, KW_DROPP, @@ -352,11 +354,13 @@ terminal String KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, + KW_ENDS, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, + KW_EVERY, KW_EXCEPT, KW_EXCLUDE, KW_EXISTS, @@ -423,6 +427,7 @@ terminal String KW_ISOLATION, KW_INVERTED, KW_JOB, + KW_JOBS, KW_JOIN, KW_JSON, KW_JSONB, @@ -551,6 +556,7 @@ terminal String KW_ROWS, KW_S3, KW_SAMPLE, + KW_SCHEDULER, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, @@ -569,11 +575,13 @@ terminal String KW_SPLIT, KW_SQL_BLOCK_RULE, KW_START, + KW_STARTS, KW_STATS, KW_STATUS, KW_STOP, KW_STORAGE, KW_STREAM, + KW_STREAMING, KW_STRING, KW_STRUCT, KW_SUM, @@ -587,6 +595,7 @@ terminal String KW_TABLET, KW_TABLETS, KW_TASK, + KW_TASKS, KW_TEMPORARY, KW_TERMINATED, KW_TEXT, @@ -662,6 +671,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, show_create_reporitory_stmt, describe_stmt, alter_stmt, + create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt, @@ -796,6 +806,8 @@ nonterminal Qualifier opt_set_qualifier; nonterminal Operation set_op; nonterminal ArrayList opt_common_hints; nonterminal String optional_on_ident; +nonterminal String opt_job_starts; +nonterminal String opt_job_ends; nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type; @@ -1128,6 +1140,16 @@ stmt ::= {: RESULT = stmt; :} | show_create_routine_load_stmt : stmt {: RESULT = stmt; :} + | create_job_stmt : stmt + {: RESULT = stmt; :} + | pause_job_stmt : stmt + {: RESULT = stmt; :} + | show_job_stmt : stmt + {: RESULT = stmt; :} + | stop_job_stmt : stmt + {: RESULT = stmt; :} + | resume_job_stmt : stmt + {: RESULT = stmt; :} | show_create_load_stmt : stmt {: RESULT = stmt; :} | show_create_reporitory_stmt : stmt @@ -2611,7 +2633,75 @@ resource_desc ::= RESULT = new ResourceDesc(resourceName, properties); :} ; +create_job_stmt ::= + KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_EVERY INTEGER_LITERAL:time_interval ident:time_unit opt_job_starts:startsTime opt_job_ends:endsTime opt_comment:comment KW_DO stmt:executeSql + {: + CreateJobStmt stmt = new CreateJobStmt(jobLabel,null,false,time_interval,time_unit, startsTime, endsTime,comment,executeSql); + RESULT = stmt; + :} + | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_STREAMING KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql + {: + CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,true,null,null,null,null,comment,executeSql); + RESULT = stmt; + :} + | KW_CREATE KW_JOB job_label:jobLabel KW_ON KW_SCHEDULER KW_AT STRING_LITERAL:atTime opt_comment:comment KW_DO stmt:executeSql + {: + CreateJobStmt stmt = new CreateJobStmt(jobLabel,atTime,false,null,null,null,null,comment,executeSql); + RESULT = stmt; + :} + ; + opt_job_starts ::= + {: + RESULT = null; + :} + | KW_STARTS STRING_LITERAL:startTime + {: + RESULT = startTime; + :} + ; + + opt_job_ends ::= + {: + RESULT = null; + :} + | KW_ENDS STRING_LITERAL:endTime + {: + RESULT = endTime; + :} + ; +resume_job_stmt ::= + KW_RESUME KW_JOB job_label:jobLabel + {: + RESULT = new ResumeJobStmt(jobLabel); + :} + ; +show_job_stmt ::= + KW_SHOW KW_JOBS + {: + RESULT = new ShowJobStmt(null,null); + :} + | KW_SHOW KW_JOB KW_FOR job_label:jobLabel + {: + RESULT = new ShowJobStmt(jobLabel,null); + :} + | KW_SHOW KW_JOB KW_TASKS KW_FOR job_label:jobLabel + {: + RESULT = new ShowJobTaskStmt(jobLabel); + :} + ; +pause_job_stmt ::= + KW_PAUSE KW_JOB KW_FOR job_label:jobLabel + {: + RESULT = new PauseJobStmt(jobLabel); + :} + ; +stop_job_stmt ::= + KW_STOP KW_JOB KW_FOR job_label:jobLabel + {: + RESULT = new StopJobStmt(jobLabel); + :} + ; // Routine load statement create_routine_load_stmt ::= KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName @@ -7347,6 +7437,8 @@ keyword ::= {: RESULT = id; :} | KW_DISTINCTPCSA:id {: RESULT = id; :} + | KW_DO:id + {: RESULT = id; :} | KW_BUCKETS:id {: RESULT = id; :} | KW_FAST:id @@ -7411,6 +7503,8 @@ keyword ::= {: RESULT = id; :} | KW_JOB:id {: RESULT = id; :} + | KW_JOBS:id + {: RESULT = id; :} | KW_JSON:id {: RESULT = id; :} | KW_JSONB:id @@ -7523,7 +7617,7 @@ keyword ::= {: RESULT = id; :} | KW_ROLLBACK:id {: RESULT = id; :} - | KW_ROLLUP:id + | KW_SCHEDULER:id {: RESULT = id; :} | KW_SCHEMA:id {: RESULT = id; :} @@ -7549,6 +7643,8 @@ keyword ::= {: RESULT = id; :} | KW_STREAM:id {: RESULT = id; :} + | KW_STREAMING:id + {: RESULT = id; :} | KW_STRUCT:id {: RESULT = id; :} | KW_STRING:id @@ -7615,8 +7711,20 @@ keyword ::= {: RESULT = id; :} | KW_TASK:id {: RESULT = id; :} + | KW_TASKS:id + {: RESULT = id; :} | KW_ROUTINE:id {: RESULT = id; :} + | KW_EVERY:id + {: RESULT = id; :} + | KW_AT:id + {: RESULT = id; :} + | KW_ROLLUP:id + {: RESULT = id; :} + | KW_STARTS:id + {: RESULT = id; :} + | KW_ENDS:id + {: RESULT = id; :} | KW_PAUSE:id {: RESULT = id; :} | KW_RESUME:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java new file mode 100644 index 0000000000..b6736b2730 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.scheduler.common.IntervalUnit; +import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.executor.SqlJobExecutor; +import org.apache.doris.scheduler.job.Job; + +import com.google.common.collect.ImmutableSet; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +/** + * syntax: + * CREATE + * [DEFINER = user] + * JOB + * event_name + * ON SCHEDULE schedule + * [COMMENT 'string'] + * DO event_body; + * schedule: { + * [STREAMING] AT timestamp + * | EVERY interval + * [STARTS timestamp ] + * [ENDS timestamp ] + * } + * interval: + * quantity { DAY | HOUR | MINUTE | + * WEEK | SECOND } + * + */ +@Slf4j +public class CreateJobStmt extends DdlStmt { + + + @Getter + private StatementBase stmt; + + @Getter + private Job job; + + private final LabelName labelName; + + private final String onceJobStartTimestamp; + + private final Long interval; + + private final String intervalTimeUnit; + + private final String startsTimeStamp; + + private final String endsTimeStamp; + + private final String comment; + + private String timezone = TimeUtils.DEFAULT_TIME_ZONE; + + private static final ImmutableSet supportStmtClassName = new ImmutableSet.Builder() + .add(NativeInsertStmt.class.getName()).build(); + + public CreateJobStmt(LabelName labelName, String onceJobStartTimestamp, Boolean isStreamingJob, + Long interval, String intervalTimeUnit, + String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) { + this.labelName = labelName; + this.onceJobStartTimestamp = onceJobStartTimestamp; + this.interval = interval; + this.intervalTimeUnit = intervalTimeUnit; + this.startsTimeStamp = startsTimeStamp; + this.endsTimeStamp = endsTimeStamp; + this.comment = comment; + this.stmt = doStmt; + this.job = new Job(); + job.setStreamingJob(isStreamingJob); + } + + private String parseExecuteSql(String sql) throws AnalysisException { + sql = sql.toLowerCase(); + int executeSqlIndex = sql.indexOf(" do "); + String executeSql = sql.substring(executeSqlIndex + 4).trim(); + if (StringUtils.isBlank(executeSql)) { + throw new AnalysisException("execute sql has invalid format"); + } + return executeSql; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkAuth(); + labelName.analyze(analyzer); + String dbName = labelName.getDbName(); + Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + job.setDbName(labelName.getDbName()); + job.setJobName(labelName.getLabelName()); + if (StringUtils.isNotBlank(onceJobStartTimestamp)) { + analyzerOnceTimeJob(); + } else { + analyzerCycleJob(); + } + if (ConnectContext.get() != null) { + timezone = ConnectContext.get().getSessionVariable().getTimeZone(); + } + timezone = TimeUtils.checkTimeZoneValidAndStandardize(timezone); + job.setTimezone(timezone); + job.setComment(comment); + //todo support user define + job.setUser("root"); + job.setJobStatus(JobStatus.RUNNING); + job.setJobCategory(JobCategory.SQL); + analyzerSqlStmt(); + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can create job"); + } + } + + private void analyzerSqlStmt() throws UserException { + if (!supportStmtClassName.contains(stmt.getClass().getName())) { + throw new AnalysisException("Not support stmt type"); + } + stmt.analyze(analyzer); + String originStmt = getOrigStmt().originStmt; + String executeSql = parseExecuteSql(originStmt); + SqlJobExecutor sqlJobExecutor = new SqlJobExecutor(executeSql); + job.setExecutor(sqlJobExecutor); + } + + + private void analyzerCycleJob() throws UserException { + job.setCycleJob(true); + if (null == interval) { + throw new AnalysisException("interval is null"); + } + if (interval <= 0) { + throw new AnalysisException("interval must be greater than 0"); + } + + if (StringUtils.isBlank(intervalTimeUnit)) { + throw new AnalysisException("intervalTimeUnit is null"); + } + try { + IntervalUnit intervalUnit = IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()); + job.setIntervalUnit(intervalUnit); + long intervalTimeMs = intervalUnit.getParameterValue(interval); + job.setIntervalMs(intervalTimeMs); + job.setOriginInterval(interval); + } catch (IllegalArgumentException e) { + throw new AnalysisException("interval time unit is not valid, we only support second,minute,hour,day,week"); + } + if (StringUtils.isNotBlank(startsTimeStamp)) { + long startsTimeMillis = TimeUtils.timeStringToLong(startsTimeStamp); + if (startsTimeMillis < System.currentTimeMillis()) { + throw new AnalysisException("starts time must be greater than current time"); + } + job.setStartTimeMs(startsTimeMillis); + } + if (StringUtils.isNotBlank(endsTimeStamp)) { + long endTimeMillis = TimeUtils.timeStringToLong(endsTimeStamp); + if (endTimeMillis < System.currentTimeMillis()) { + throw new AnalysisException("ends time must be greater than current time"); + } + job.setEndTimeMs(endTimeMillis); + } + if (job.getStartTimeMs() > 0 && job.getEndTimeMs() > 0 + && (job.getEndTimeMs() - job.getStartTimeMs() < job.getIntervalMs())) { + throw new AnalysisException("ends time must be greater than start time and interval time"); + } + } + + + private void analyzerOnceTimeJob() throws UserException { + job.setCycleJob(false); + + job.setIntervalMs(0L); + + long executeAtTimeMillis = TimeUtils.timeStringToLong(onceJobStartTimestamp); + if (executeAtTimeMillis < System.currentTimeMillis()) { + throw new AnalysisException("job time stamp must be greater than current time"); + } + job.setStartTimeMs(executeAtTimeMillis); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java new file mode 100644 index 0000000000..dd23e61b9a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +/** + * syntax: + * PAUSE JOB FOR [database.]name + * we can pause a job by jobName + * it's only running job can be paused, and it will be paused immediately + * paused job can be resumed by RESUME EVENT FOR jobName + */ +public class PauseJobStmt extends DdlStmt { + + private final LabelName labelName; + private String db; + + public PauseJobStmt(LabelName labelName) { + this.labelName = labelName; + } + + public boolean isAll() { + return labelName == null; + } + + public String getName() { + return labelName.getLabelName(); + } + + public String getDbFullName() { + return db; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkAuth(); + if (labelName != null) { + labelName.analyze(analyzer); + db = labelName.getDbName(); + } else { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); + } + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java new file mode 100644 index 0000000000..8cc305bf84 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Strings; + +public class ResumeJobStmt extends DdlStmt { + + private final LabelName labelName; + + private String db; + + public ResumeJobStmt(LabelName labelName) { + this.labelName = labelName; + } + + public boolean isAll() { + return labelName == null; + } + + public String getName() { + return labelName.getLabelName(); + } + + public String getDbFullName() { + return db; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + checkAuth(); + if (labelName != null) { + labelName.analyze(analyzer); + db = labelName.getDbName(); + } else { + if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); + } + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java new file mode 100644 index 0000000000..56fc916b2a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobStmt.java @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * SHOW JOB [FOR JobName] + * eg: show event + * return all job in connection db + * eg: show event for test + * return job named test in connection db + */ +public class ShowJobStmt extends ShowStmt { + + private static final ImmutableList TITLE_NAMES = + new ImmutableList.Builder() + .add("Id") + .add("Db") + .add("Name") + .add("Definer") + .add("TimeZone") + .add("ExecuteType") + .add("ExecuteAt") + .add("ExecuteInterval") + .add("ExecuteIntervalUnit") + .add("Starts") + .add("Ends") + .add("Status") + .add("LastExecuteFinishTime") + .add("ErrorMsg") + .add("Comment") + .build(); + + private final LabelName labelName; + private String dbFullName; // optional + private String name; // optional + private String pattern; // optional + + public ShowJobStmt(LabelName labelName, String pattern) { + this.labelName = labelName; + this.pattern = pattern; + } + + public String getDbFullName() { + return dbFullName; + } + + public String getName() { + return name; + } + + public String getPattern() { + return pattern; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkAuth(); + checkLabelName(analyzer); + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } + + private void checkLabelName(Analyzer analyzer) throws AnalysisException { + String dbName = labelName == null ? null : labelName.getDbName(); + if (Strings.isNullOrEmpty(dbName)) { + dbFullName = analyzer.getContext().getDatabase(); + if (Strings.isNullOrEmpty(dbFullName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } else { + dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName); + } + name = labelName == null ? null : labelName.getLabelName(); + } + + public static List getTitleNames() { + return TITLE_NAMES; + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java new file mode 100644 index 0000000000..d16fe2a9a2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowJobTaskStmt.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * SHOW JOB TASKS [FOR JobName] + */ +public class ShowJobTaskStmt extends ShowStmt { + + private static final ImmutableList TITLE_NAMES = + new ImmutableList.Builder() + .add("JobId") + .add("TaskId") + .add("StartTime") + .add("EndTime") + .add("Status") + .add("ErrorMsg") + .build(); + + private final LabelName labelName; + private String dbFullName; // optional + private String name; // optional + + public ShowJobTaskStmt(LabelName labelName) { + this.labelName = labelName; + } + + public String getDbFullName() { + return dbFullName; + } + + public String getName() { + return name; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkAuth(); + checkLabelName(analyzer); + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } + + private void checkLabelName(Analyzer analyzer) throws AnalysisException { + String dbName = labelName == null ? null : labelName.getDbName(); + if (Strings.isNullOrEmpty(dbName)) { + dbFullName = analyzer.getContext().getDatabase(); + if (Strings.isNullOrEmpty(dbFullName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + } else { + dbFullName = ClusterNamespace.getFullName(getClusterName(), dbName); + } + if (null == labelName) { + throw new AnalysisException("Job name is null"); + } + name = labelName.getLabelName(); + } + + public static List getTitleNames() { + return TITLE_NAMES; + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(30))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java new file mode 100644 index 0000000000..fceba0d200 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +/** + * syntax: + * STOP JOB FOR [database.]name + * only run job can be stopped, and stopped job can't be resumed + */ +public class StopJobStmt extends DdlStmt { + + private final LabelName labelName; + + public StopJobStmt(LabelName labelName) { + this.labelName = labelName; + } + + public String getName() { + return labelName.getLabelName(); + } + + public String getDbFullName() { + return labelName.getDbName(); + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + checkAuth(); + labelName.analyze(analyzer); + } + + private void checkAuth() throws AnalysisException { + UserIdentity userIdentity = ConnectContext.get().getCurrentUserIdentity(); + if (!userIdentity.isRootUser()) { + throw new AnalysisException("only root user can operate"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 30fe9ab59c..5c9cfb3434 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -211,6 +211,10 @@ import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; +import org.apache.doris.scheduler.AsyncJobRegister; +import org.apache.doris.scheduler.manager.AsyncJobManager; +import org.apache.doris.scheduler.manager.JobTaskManager; +import org.apache.doris.scheduler.registry.PersistentJobRegister; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.StatisticsAutoAnalyzer; @@ -330,6 +334,9 @@ public class Env { private CooldownConfHandler cooldownConfHandler; private MetastoreEventsProcessor metastoreEventsProcessor; + private PersistentJobRegister persistentJobRegister; + private AsyncJobManager asyncJobManager; + private JobTaskManager jobTaskManager; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos private MasterDaemon txnCleaner; // To clean aborted or timeout txns private Daemon replayer; @@ -585,7 +592,9 @@ public class Env { this.cooldownConfHandler = new CooldownConfHandler(); } this.metastoreEventsProcessor = new MetastoreEventsProcessor(); - + this.jobTaskManager = new JobTaskManager(); + this.asyncJobManager = new AsyncJobManager(); + this.persistentJobRegister = new AsyncJobRegister(asyncJobManager); this.replayedJournalId = new AtomicLong(0L); this.stmtIdCounter = new AtomicLong(0L); this.isElectable = false; @@ -1957,6 +1966,30 @@ public class Env { return checksum; } + public long loadAsyncJobManager(DataInputStream in, long checksum) throws IOException { + asyncJobManager.readFields(in); + LOG.info("finished replay asyncJobMgr from image"); + return checksum; + } + + public long saveAsyncJobManager(CountingDataOutputStream out, long checksum) throws IOException { + asyncJobManager.write(out); + LOG.info("finished save analysisMgr to image"); + return checksum; + } + + public long loadJobTaskManager(DataInputStream in, long checksum) throws IOException { + jobTaskManager.readFields(in); + LOG.info("finished replay jobTaskMgr from image"); + return checksum; + } + + public long saveJobTaskManager(CountingDataOutputStream out, long checksum) throws IOException { + jobTaskManager.write(out); + LOG.info("finished save jobTaskMgr to image"); + return checksum; + } + public long loadResources(DataInputStream in, long checksum) throws IOException { resourceMgr = ResourceMgr.read(in); LOG.info("finished replay resources from image"); @@ -3726,6 +3759,18 @@ public class Env { return this.syncJobManager; } + public PersistentJobRegister getJobRegister() { + return persistentJobRegister; + } + + public AsyncJobManager getAsyncJobManager() { + return asyncJobManager; + } + + public JobTaskManager getJobTaskManager() { + return jobTaskManager; + } + public SmallFileMgr getSmallFileMgr() { return this.smallFileMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java index 3417107a64..8719c0f001 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CaseSensibility.java @@ -36,7 +36,8 @@ public enum CaseSensibility { RESOURCE(true), CONFIG(true), ROUTINE_LOAD(true), - WORKLOAD_GROUP(true); + WORKLOAD_GROUP(true), + JOB(true); private boolean caseSensitive; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LogKey.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LogKey.java index 5de87a7baa..ab146819ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LogKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LogKey.java @@ -22,5 +22,7 @@ public enum LogKey { ROUTINE_LOAD_TASK, LOAD_JOB, LOAD_TASK, - SYNC_JOB + SYNC_JOB, + SCHEDULER_JOB, + SCHEDULER_TASK } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 5f87e553ef..43c8227761 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -118,6 +118,8 @@ import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -518,6 +520,21 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_UPDATE_SCHEDULER_JOB: + case OperationType.OP_DELETE_SCHEDULER_JOB: + case OperationType.OP_CREATE_SCHEDULER_JOB: { + Job job = Job.readFields(in); + data = job; + isRead = true; + break; + } + case OperationType.OP_CREATE_SCHEDULER_TASK: + case OperationType.OP_DELETE_SCHEDULER_TASK: { + JobTask task = JobTask.readFields(in); + data = task; + isRead = true; + break; + } case OperationType.OP_CREATE_LOAD_JOB: { data = org.apache.doris.load.loadv2.LoadJob.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 9b5a54323e..356eb85daa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -81,6 +81,8 @@ import org.apache.doris.policy.DropPolicyLog; import org.apache.doris.policy.Policy; import org.apache.doris.policy.StoragePolicy; import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -644,6 +646,31 @@ public class EditLog { Env.getCurrentEnv().getRoutineLoadManager().replayCreateRoutineLoadJob(routineLoadJob); break; } + case OperationType.OP_CREATE_SCHEDULER_JOB: { + Job job = (Job) journal.getData(); + Env.getCurrentEnv().getAsyncJobManager().replayCreateJob(job); + break; + } + case OperationType.OP_UPDATE_SCHEDULER_JOB: { + Job job = (Job) journal.getData(); + Env.getCurrentEnv().getAsyncJobManager().replayUpdateJob(job); + break; + } + case OperationType.OP_DELETE_SCHEDULER_JOB: { + Job job = (Job) journal.getData(); + Env.getCurrentEnv().getAsyncJobManager().replayDeleteJob(job); + break; + } + case OperationType.OP_CREATE_SCHEDULER_TASK: { + JobTask task = (JobTask) journal.getData(); + Env.getCurrentEnv().getJobTaskManager().replayCreateTask(task); + break; + } + case OperationType.OP_DELETE_SCHEDULER_TASK: { + JobTask task = (JobTask) journal.getData(); + Env.getCurrentEnv().getJobTaskManager().replayDeleteTask(task); + break; + } case OperationType.OP_CHANGE_ROUTINE_LOAD_JOB: { RoutineLoadOperation operation = (RoutineLoadOperation) journal.getData(); Env.getCurrentEnv().getRoutineLoadManager().replayChangeRoutineLoadJob(operation); @@ -1536,6 +1563,26 @@ public class EditLog { logEdit(OperationType.OP_CREATE_ROUTINE_LOAD_JOB, routineLoadJob); } + public void logCreateJob(Job job) { + logEdit(OperationType.OP_CREATE_SCHEDULER_JOB, job); + } + + public void logUpdateJob(Job job) { + logEdit(OperationType.OP_UPDATE_SCHEDULER_JOB, job); + } + + public void logCreateJobTask(JobTask jobTask) { + logEdit(OperationType.OP_CREATE_SCHEDULER_TASK, jobTask); + } + + public void logDeleteJobTask(JobTask jobTask) { + logEdit(OperationType.OP_DELETE_SCHEDULER_TASK, jobTask); + } + + public void logDeleteJob(Job job) { + logEdit(OperationType.OP_DELETE_SCHEDULER_JOB, job); + } + public void logOpRoutineLoadJob(RoutineLoadOperation routineLoadOperation) { logEdit(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB, routineLoadOperation); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2b935e74c5..26fec1f296 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -313,6 +313,17 @@ public class OperationType { // change an auto increment id for a column public static final short OP_UPDATE_AUTO_INCREMENT_ID = 437; + // scheduler job + public static final short OP_CREATE_SCHEDULER_JOB = 450; + + public static final short OP_UPDATE_SCHEDULER_JOB = 451; + + public static final short OP_DELETE_SCHEDULER_JOB = 452; + + public static final short OP_CREATE_SCHEDULER_TASK = 453; + public static final short OP_DELETE_SCHEDULER_TASK = 454; + + /** * Get opcode name by op code. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 055fb86495..7024bf4216 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -81,6 +81,8 @@ import org.apache.doris.load.sync.canal.CanalSyncJob; import org.apache.doris.policy.Policy; import org.apache.doris.policy.RowPolicy; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.scheduler.executor.JobExecutor; +import org.apache.doris.scheduler.executor.SqlJobExecutor; import org.apache.doris.system.BackendHbResponse; import org.apache.doris.system.BrokerHbResponse; import org.apache.doris.system.FrontendHbResponse; @@ -211,6 +213,10 @@ public class GsonUtils { RuntimeTypeAdapterFactory.of( AbstractDataSourceProperties.class, "clazz") .registerSubtype(KafkaDataSourceProperties.class, KafkaDataSourceProperties.class.getSimpleName()); + private static RuntimeTypeAdapterFactory jobExecutorRuntimeTypeAdapterFactory = + RuntimeTypeAdapterFactory.of( + JobExecutor.class, "clazz") + .registerSubtype(SqlJobExecutor.class, SqlJobExecutor.class.getSimpleName()); private static RuntimeTypeAdapterFactory dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of( DatabaseIf.class, "clazz") @@ -264,10 +270,25 @@ public class GsonUtils { .registerTypeAdapterFactory(partitionInfoTypeAdapterFactory) .registerTypeAdapterFactory(hbResponseTypeAdapterFactory) .registerTypeAdapterFactory(rdsTypeAdapterFactory) + .registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()) .registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer()) - .registerTypeAdapter(Range.class, new RangeUtils.RangeSerializer()); + .registerTypeAdapter(Range.class, new RangeUtils.RangeSerializer()).setExclusionStrategies( + new ExclusionStrategy() { + @Override + public boolean shouldSkipField(FieldAttributes f) { + return false; + } + + @Override + public boolean shouldSkipClass(Class clazz) { + /* due to java.lang.IllegalArgumentException: com.lmax.disruptor.RingBuffer + declares multiple + JSON fields named p1 */ + return clazz.getName().startsWith("com.lmax.disruptor.RingBuffer"); + } + }); private static final GsonBuilder GSON_BUILDER_PRETTY_PRINTING = GSON_BUILDER.setPrettyPrinting(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index bdc3a5a224..885cdd0d89 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -230,6 +230,18 @@ public class MetaPersistMethod { metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveAnalysisMgr", CountingDataOutputStream.class, long.class); break; + case "AsyncJobManager": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadAsyncJobManager", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveAsyncJobManager", CountingDataOutputStream.class, long.class); + break; + case "JobTaskManager": + metaPersistMethod.readMethod = + Env.class.getDeclaredMethod("loadJobTaskManager", DataInputStream.class, long.class); + metaPersistMethod.writeMethod = + Env.class.getDeclaredMethod("saveJobTaskManager", CountingDataOutputStream.class, long.class); + break; default: break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index 6e99a6757f..3d2aa8b0eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -39,7 +39,7 @@ public class PersistMetaModules { "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "globalFunction", "workloadGroups", - "binlogs", "resourceGroups", "AnalysisMgr"); + "binlogs", "resourceGroups", "AnalysisMgr", "AsyncJobManager", "JobTaskManager"); // Modules in this list is deprecated and will not be saved in meta file. (also should not be in MODULE_NAMES) public static final ImmutableList DEPRECATED_MODULE_NAMES = ImmutableList.of( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 15b7bcc883..81a69bd9e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -58,6 +58,7 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateEncryptKeyStmt; import org.apache.doris.analysis.CreateFileStmt; import org.apache.doris.analysis.CreateFunctionStmt; +import org.apache.doris.analysis.CreateJobStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt; import org.apache.doris.analysis.CreatePolicyStmt; @@ -93,6 +94,7 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.InstallPluginStmt; import org.apache.doris.analysis.KillAnalysisJobStmt; +import org.apache.doris.analysis.PauseJobStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.PauseSyncJobStmt; import org.apache.doris.analysis.RecoverDbStmt; @@ -104,10 +106,12 @@ import org.apache.doris.analysis.RefreshLdapStmt; import org.apache.doris.analysis.RefreshMaterializedViewStmt; import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.analysis.RestoreStmt; +import org.apache.doris.analysis.ResumeJobStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; import org.apache.doris.analysis.ResumeSyncJobStmt; import org.apache.doris.analysis.RevokeStmt; import org.apache.doris.analysis.SetUserPropertyStmt; +import org.apache.doris.analysis.StopJobStmt; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.analysis.StopSyncJobStmt; import org.apache.doris.analysis.SyncStmt; @@ -119,6 +123,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.persist.CleanQueryStatsInfo; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.statistics.StatisticsRepository; import org.apache.logging.log4j.LogManager; @@ -182,6 +187,17 @@ public class DdlExecutor { env.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof AlterRoutineLoadStmt) { env.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt); + } else if (ddlStmt instanceof CreateJobStmt) { + env.getJobRegister().registerJob((((CreateJobStmt) ddlStmt).getJob())); + } else if (ddlStmt instanceof StopJobStmt) { + StopJobStmt stmt = (StopJobStmt) ddlStmt; + env.getJobRegister().stopJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); + } else if (ddlStmt instanceof PauseJobStmt) { + PauseJobStmt stmt = (PauseJobStmt) ddlStmt; + env.getJobRegister().pauseJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); + } else if (ddlStmt instanceof ResumeJobStmt) { + ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt; + env.getJobRegister().resumeJob(stmt.getDbFullName(), stmt.getName(), JobCategory.SQL); } else if (ddlStmt instanceof DeleteStmt) { env.getDeleteHandler().process((DeleteStmt) ddlStmt); } else if (ddlStmt instanceof CreateUserStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 322dcdf500..0f1c3e8bd8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -62,6 +62,8 @@ import org.apache.doris.analysis.ShowFrontendsStmt; import org.apache.doris.analysis.ShowFunctionsStmt; import org.apache.doris.analysis.ShowGrantsStmt; import org.apache.doris.analysis.ShowIndexStmt; +import org.apache.doris.analysis.ShowJobStmt; +import org.apache.doris.analysis.ShowJobTaskStmt; import org.apache.doris.analysis.ShowLastInsertStmt; import org.apache.doris.analysis.ShowLoadProfileStmt; import org.apache.doris.analysis.ShowLoadStmt; @@ -188,6 +190,9 @@ import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; @@ -213,6 +218,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -423,6 +429,10 @@ public class ShowExecutor { handleShowBuildIndexStmt(); } else if (stmt instanceof ShowAnalyzeTaskStatus) { handleShowAnalyzeTaskStatus(); + } else if (stmt instanceof ShowJobStmt) { + handleShowJob(); + } else if (stmt instanceof ShowJobTaskStmt) { + handleShowJobTask(); } else { handleEmtpy(); } @@ -1378,6 +1388,61 @@ public class ShowExecutor { resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows); } + private void handleShowJobTask() { + ShowJobTaskStmt showJobTaskStmt = (ShowJobTaskStmt) stmt; + List> rows = Lists.newArrayList(); + List jobs = Env.getCurrentEnv().getJobRegister() + .getJobs(showJobTaskStmt.getDbFullName(), showJobTaskStmt.getName(), JobCategory.SQL, + null); + if (CollectionUtils.isEmpty(jobs)) { + resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); + return; + } + long jobId = jobs.get(0).getJobId(); + List jobTasks = Env.getCurrentEnv().getJobTaskManager().getJobTasks(jobId); + if (CollectionUtils.isEmpty(jobTasks)) { + resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); + return; + } + for (JobTask job : jobTasks) { + rows.add(job.getShowInfo()); + } + resultSet = new ShowResultSet(showJobTaskStmt.getMetaData(), rows); + } + + private void handleShowJob() throws AnalysisException { + ShowJobStmt showJobStmt = (ShowJobStmt) stmt; + List> rows = Lists.newArrayList(); + // if job exists + List jobList; + PatternMatcher matcher = null; + if (showJobStmt.getPattern() != null) { + matcher = PatternMatcherWrapper.createMysqlPattern(showJobStmt.getPattern(), + CaseSensibility.JOB.getCaseSensibility()); + } + jobList = Env.getCurrentEnv().getJobRegister() + .getJobs(showJobStmt.getDbFullName(), showJobStmt.getName(), JobCategory.SQL, + matcher); + + if (jobList.isEmpty()) { + resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows); + return; + } + + // check auth + for (Job job : jobList) { + if (!Env.getCurrentEnv().getAccessManager() + .checkDbPriv(ConnectContext.get(), job.getDbName(), PrivPredicate.SHOW)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, + ConnectContext.get().getQualifiedUser(), job.getDbName()); + } + } + for (Job job : jobList) { + rows.add(job.getShowInfo()); + } + resultSet = new ShowResultSet(showJobStmt.getMetaData(), rows); + } + private void handleShowRoutineLoad() throws AnalysisException { ShowRoutineLoadStmt showRoutineLoadStmt = (ShowRoutineLoadStmt) stmt; List> rows = Lists.newArrayList(); @@ -2533,7 +2598,7 @@ public class ShowExecutor { public void handleShowCatalogs() throws AnalysisException { ShowCatalogStmt showStmt = (ShowCatalogStmt) stmt; resultSet = Env.getCurrentEnv().getCatalogMgr().showCatalogs(showStmt, ctx.getCurrentCatalog() != null - ? ctx.getCurrentCatalog().getName() : null); + ? ctx.getCurrentCatalog().getName() : null); } // Show create catalog @@ -2741,13 +2806,13 @@ public class ShowExecutor { resultSet = new ShowResultSet(showMetaData, resultRowSet); } - private void handleShowBuildIndexStmt() throws AnalysisException { + private void handleShowBuildIndexStmt() throws AnalysisException { ShowBuildIndexStmt showStmt = (ShowBuildIndexStmt) stmt; ProcNodeInterface procNodeI = showStmt.getNode(); Preconditions.checkNotNull(procNodeI); // List> rows = ((BuildIndexProcDir) procNodeI).fetchResult().getRows(); List> rows = ((BuildIndexProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(), - showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows(); + showStmt.getOrderPairs(), showStmt.getLimitElement()).getRows(); resultSet = new ShowResultSet(showStmt.getMetaData(), rows); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java index 59c64906e8..9118da7836 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/AsyncJobRegister.java @@ -17,14 +17,18 @@ package org.apache.doris.scheduler; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.executor.JobExecutor; -import org.apache.doris.scheduler.job.AsyncJobManager; import org.apache.doris.scheduler.job.Job; -import org.apache.doris.scheduler.registry.JobRegister; +import org.apache.doris.scheduler.manager.AsyncJobManager; +import org.apache.doris.scheduler.registry.PersistentJobRegister; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.List; /** * This class registers timed scheduling events using the Netty time wheel algorithm to trigger events in a timely @@ -34,45 +38,70 @@ import java.io.IOException; * consumption model that does not guarantee strict timing accuracy. */ @Slf4j -public class AsyncJobRegister implements JobRegister { +public class AsyncJobRegister implements PersistentJobRegister { private final AsyncJobManager asyncJobManager; - public AsyncJobRegister() { - this.asyncJobManager = new AsyncJobManager(); + public AsyncJobRegister(AsyncJobManager asyncJobManager) { + this.asyncJobManager = asyncJobManager; } @Override - public Long registerJob(String name, Long intervalMs, JobExecutor executor) { + public Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException { return this.registerJob(name, intervalMs, null, null, executor); } @Override - public Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) { - return this.registerJob(name, intervalMs, startTimeStamp, null, executor); + public Long registerJob(String name, Long intervalMs, Long startTimeMs, JobExecutor executor) throws DdlException { + return this.registerJob(name, intervalMs, startTimeMs, null, executor); } @Override - public Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp, - JobExecutor executor) { + public Long registerJob(String name, Long intervalMs, Long startTimeMs, Long endTimeStamp, + JobExecutor executor) throws DdlException { - Job job = new Job(name, intervalMs, startTimeStamp, endTimeStamp, executor); + Job job = new Job(name, intervalMs, startTimeMs, endTimeStamp, executor); return asyncJobManager.registerJob(job); } @Override - public Boolean pauseJob(Long jobId) { - return asyncJobManager.pauseJob(jobId); + public Long registerJob(Job job) throws DdlException { + return asyncJobManager.registerJob(job); } @Override - public Boolean stopJob(Long jobId) { - return asyncJobManager.stopJob(jobId); + public void pauseJob(Long jobId) { + asyncJobManager.pauseJob(jobId); } @Override - public Boolean resumeJob(Long jobId) { - return asyncJobManager.resumeJob(jobId); + public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + asyncJobManager.pauseJob(dbName, jobName, jobCategory); + } + + @Override + public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + asyncJobManager.resumeJob(dbName, jobName, jobCategory); + } + + @Override + public void stopJob(Long jobId) { + asyncJobManager.stopJob(jobId); + } + + @Override + public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + asyncJobManager.stopJob(dbName, jobName, jobCategory); + } + + @Override + public void resumeJob(Long jobId) { + asyncJobManager.resumeJob(jobId); + } + + @Override + public List getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher) { + return asyncJobManager.queryJob(dbFullName, jobName, jobCategory, matcher); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/common/IntervalUnit.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/common/IntervalUnit.java new file mode 100644 index 0000000000..27f10d8a3f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/common/IntervalUnit.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.common; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public enum IntervalUnit { + + SECOND("second", 0L, TimeUnit.SECONDS::toMillis), + MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis), + HOUR("hour", 0L, TimeUnit.HOURS::toMillis), + DAY("day", 0L, TimeUnit.DAYS::toMillis), + WEEK("week", 0L, v -> TimeUnit.DAYS.toMillis(v * 7)); + private final String unit; + + public String getUnit() { + return unit; + } + + public static IntervalUnit fromString(String unit) { + for (IntervalUnit u : IntervalUnit.values()) { + if (u.unit.equalsIgnoreCase(unit)) { + return u; + } + } + return null; + } + + private final Object defaultValue; + + private final Function converter; + + IntervalUnit(String unit, Long defaultValue, Function converter) { + this.unit = unit; + this.defaultValue = defaultValue; + this.converter = converter; + } + + IntervalUnit getByName(String name) { + return Arrays.stream(IntervalUnit.values()) + .filter(config -> config.getUnit().equals(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name)); + } + + public Long getParameterValue(Long param) { + return (Long) (param != null ? converter.apply(param) : defaultValue); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java similarity index 53% rename from fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java rename to fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java index 2613a0302c..eb2653b9da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/JobRegisterFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobCategory.java @@ -15,27 +15,35 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.scheduler; +package org.apache.doris.scheduler.constants; -import org.apache.doris.scheduler.registry.JobRegister; - -import java.util.concurrent.atomic.AtomicReference; +import lombok.Getter; /** - * This class provides a factory for creating instances of {@link JobRegister}. - * The factory ensures that only one instance of the client is created in a lazy manner. + * The job category is used to distinguish different types of jobs. */ -public class JobRegisterFactory { - private static final AtomicReference INSTANCE = new AtomicReference<>(); +public enum JobCategory { + COMMON(1, "common"), + SQL(2, "sql"), + ; - public static JobRegister getInstance() { - JobRegister instance = INSTANCE.get(); - if (instance == null) { - instance = new AsyncJobRegister(); - if (!INSTANCE.compareAndSet(null, instance)) { - instance = INSTANCE.get(); + @Getter + private int code; + + @Getter + private String description; + + JobCategory(int code, String description) { + this.code = code; + this.description = description; + } + + public static JobCategory getJobCategoryByCode(int code) { + for (JobCategory jobCategory : JobCategory.values()) { + if (jobCategory.getCode() == code) { + return jobCategory; } } - return instance; + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java index 5c4af0b649..ae204ef948 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java @@ -19,6 +19,7 @@ package org.apache.doris.scheduler.constants; public enum JobStatus { + /** * When the task is not started, the initial state will be triggered. * The initial state can be started @@ -35,4 +36,11 @@ public enum JobStatus { * The stop state cannot be resumed */ STOPPED, + + WAITING_FINISH, + + /** + * When the task is finished, the finished state will be triggered. + */ + FINISHED } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java new file mode 100644 index 0000000000..4f4467c989 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobType.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.constants; + +public enum JobType { + /** + * The job will be executed only once. + */ + ONE_TIME, + /** + * The job will be executed periodically. + */ + RECURRING, + /** + * JOB_TYPE_STREAMING is used to identify the streaming job. + */ + STREAMING +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java index f24f6e4e19..3428c1724f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java @@ -21,7 +21,7 @@ import lombok.Getter; /** * System scheduler event job - * They will start when scheduler starts + * They will start when scheduler starts,don't use this job in other place,it just for system inner scheduler */ public enum SystemJob { diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java index 98a2736542..db7bb0321a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptor.java @@ -17,10 +17,10 @@ package org.apache.doris.scheduler.disruptor; -import org.apache.doris.scheduler.job.AsyncJobManager; +import org.apache.doris.scheduler.manager.AsyncJobManager; import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventTranslatorTwoArg; +import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -66,14 +66,11 @@ public class TimerTaskDisruptor implements Closeable { private boolean isClosed = false; /** - * The default {@link EventTranslatorTwoArg} to use for {@link #tryPublish(Long, Long)}. + * The default {@link EventTranslatorOneArg} to use for {@link #tryPublish(Long)}. * This is used to avoid creating a new object for each publish. */ - private static final EventTranslatorTwoArg TRANSLATOR - = (event, sequence, jobId, taskId) -> { - event.setJobId(jobId); - event.setTaskId(taskId); - }; + private static final EventTranslatorOneArg TRANSLATOR + = (event, sequence, jobId) -> event.setJobId(jobId); public TimerTaskDisruptor(AsyncJobManager asyncJobManager) { ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; @@ -90,18 +87,17 @@ public class TimerTaskDisruptor implements Closeable { /** * Publishes an event to the disruptor. * - * @param eventId event job id - * @param taskId event task id + * @param jobId job id */ - public void tryPublish(Long eventId, Long taskId) { + public void tryPublish(Long jobId) { if (isClosed) { - log.info("tryPublish failed, disruptor is closed, eventId: {}", eventId); + log.info("tryPublish failed, disruptor is closed, jobId: {}", jobId); return; } try { - disruptor.publishEvent(TRANSLATOR, eventId, taskId); + disruptor.publishEvent(TRANSLATOR, jobId); } catch (Exception e) { - log.error("tryPublish failed, eventId: {}", eventId, e); + log.error("tryPublish failed, jobId: {}", jobId, e); } } @@ -111,7 +107,7 @@ public class TimerTaskDisruptor implements Closeable { return false; } try { - disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId(), timerTaskEvent.getTaskId()); + disruptor.publishEvent(TRANSLATOR, timerTaskEvent.getJobId()); return true; } catch (Exception e) { log.error("tryPublish failed, eventJobId: {}", timerTaskEvent.getJobId(), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java index 3c1cfe440d..379559d24f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskEvent.java @@ -32,7 +32,5 @@ public class TimerTaskEvent { private Long jobId; - private Long taskId; - public static final EventFactory FACTORY = TimerTaskEvent::new; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java index 8c4a5db681..b778871189 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TimerTaskExpirationHandler.java @@ -17,9 +17,13 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.catalog.Env; +import org.apache.doris.scheduler.constants.JobStatus; import org.apache.doris.scheduler.constants.SystemJob; -import org.apache.doris.scheduler.job.AsyncJobManager; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.job.JobTask; +import org.apache.doris.scheduler.manager.AsyncJobManager; +import org.apache.doris.scheduler.manager.JobTaskManager; import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; @@ -41,6 +45,8 @@ public class TimerTaskExpirationHandler implements WorkHandler { */ private AsyncJobManager asyncJobManager; + private JobTaskManager jobTaskManager; + /** * Constructs a new {@link TimerTaskExpirationHandler} instance with the specified event job manager. * @@ -79,21 +85,38 @@ public class TimerTaskExpirationHandler implements WorkHandler { log.info("Event job is null, eventJobId: {}", jobId); return; } - if (!job.isRunning()) { + if (!job.isRunning() && !job.getJobStatus().equals(JobStatus.WAITING_FINISH)) { log.info("Event job is not running, eventJobId: {}", jobId); return; } log.debug("Event job is running, eventJobId: {}", jobId); - checkJobIsExpired(job); + JobTask jobTask = new JobTask(jobId); try { + jobTask.setStartTimeMs(System.currentTimeMillis()); + + // TODO: We should record the result of the event task. //Object result = job.getExecutor().execute(); - job.getExecutor().execute(); - job.setLatestCompleteExecuteTimestamp(System.currentTimeMillis()); + job.getExecutor().execute(job); + job.setLatestCompleteExecuteTimeMs(System.currentTimeMillis()); + if (job.isCycleJob()) { + updateJobStatusIfPastEndTime(job); + } else { + // one time job should be finished after execute + updateOnceTimeJobStatus(job); + } + jobTask.setIsSuccessful(true); } catch (Exception e) { - log.error("Event job execute failed, jobId: {}", jobId, e); + log.warn("Event job execute failed, jobId: {}, msg : {}", jobId, e.getMessage()); job.pause(e.getMessage()); + jobTask.setErrorMsg(e.getMessage()); + jobTask.setIsSuccessful(false); } + jobTask.setEndTimeMs(System.currentTimeMillis()); + if (null == jobTaskManager) { + jobTaskManager = Env.getCurrentEnv().getJobTaskManager(); + } + jobTaskManager.addJobTask(jobTask); } /** @@ -117,9 +140,18 @@ public class TimerTaskExpirationHandler implements WorkHandler { return Objects.equals(event.getJobId(), SystemJob.SYSTEM_SCHEDULER_JOB.getId()); } - private void checkJobIsExpired(Job job) { + private void updateJobStatusIfPastEndTime(Job job) { if (job.isExpired()) { - job.pause(); + job.finish(); } } + + private void updateOnceTimeJobStatus(Job job) { + if (job.isStreamingJob()) { + asyncJobManager.putOneJobToQueen(job.getJobId()); + return; + } + job.finish(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/exception/JobException.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/exception/JobException.java new file mode 100644 index 0000000000..8d1816d47b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/exception/JobException.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.exception; + +/** + * This class represents a job exception that can be thrown when a job is executed. + */ +public class JobException extends Exception { + public JobException(String message) { + super(message); + } + + public JobException(String message, Throwable cause) { + super(message, cause); + } + + public JobException(Throwable cause) { + super(cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java index cd96b6a6e4..e67e62f267 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/JobExecutor.java @@ -17,9 +17,16 @@ package org.apache.doris.scheduler.executor; +import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.job.Job; + /** * This interface represents a callback for an event registration. All event registrations * must implement this interface to provide an execution method. + * We will persist JobExecutor in the database, and then execute it when the scheduler starts. + * We use Gson to serialize and deserialize JobExecutor. so the implementation of JobExecutor needs to be serializable. + * You can see @org.apache.doris.persist.gson.GsonUtils.java for details.When you implement JobExecutor,pls make sure + * you can serialize and deserialize it. * * @param The result type of the event job execution. */ @@ -32,6 +39,6 @@ public interface JobExecutor { * * @return The result of the event job execution. */ - T execute(); + T execute(Job job) throws JobException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/MemoryTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/MemoryTaskExecutor.java new file mode 100644 index 0000000000..b5493b3758 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/MemoryTaskExecutor.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.executor; + +import org.apache.doris.scheduler.exception.JobException; + +/** + * A functional interface for executing a job. + * todo + */ +@FunctionalInterface +public interface MemoryTaskExecutor { + + /** + * Executes the event job and returns the result. + * Exceptions will be caught internally, so there is no need to define or throw them separately. + */ + void execute() throws JobException; +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java new file mode 100644 index 0000000000..2f2495f7f8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/executor/SqlJobExecutor.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.executor; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.scheduler.exception.JobException; +import org.apache.doris.scheduler.job.Job; +import org.apache.doris.thrift.TUniqueId; + +import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.util.UUID; + +/** + * we use this executor to execute sql job + * @param the state of sql job, we can record the state of sql job + */ +@Slf4j +public class SqlJobExecutor implements JobExecutor { + + @Getter + @Setter + @SerializedName(value = "sql") + private String sql; + + public SqlJobExecutor(String sql) { + this.sql = sql; + } + + @Override + public QueryState execute(Job job) throws JobException { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setCluster(ClusterNamespace.getClusterNameFromFullName(job.getDbName())); + ctx.setDatabase(job.getDbName()); + ctx.setQualifiedUser(job.getUser()); + ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(job.getUser(), "%")); + ctx.getState().reset(); + ctx.setThreadLocalInfo(); + String taskIdString = UUID.randomUUID().toString(); + UUID taskId = UUID.fromString(taskIdString); + TUniqueId queryId = new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits()); + ctx.setQueryId(queryId); + try { + StmtExecutor executor = new StmtExecutor(ctx, sql); + executor.execute(queryId); + log.debug("execute sql job success, sql: {}, state is: {}", sql, ctx.getState()); + return (QueryState) ctx.getState(); + } catch (Exception e) { + log.warn("execute sql job failed, sql: {}, error: {}", sql, e); + throw new JobException("execute sql job failed, sql: " + sql + ", error: " + e.getMessage()); + } + + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java deleted file mode 100644 index e0944bf24a..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/AsyncJobManager.java +++ /dev/null @@ -1,262 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.scheduler.job; - -import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor; - -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import lombok.extern.slf4j.Slf4j; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -@Slf4j -public class AsyncJobManager implements Closeable { - - private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(128); - - private long lastBatchSchedulerTimestamp; - - /** - * batch scheduler interval time - */ - private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L; - - private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; - - - private boolean isClosed = false; - - /** - * key: jobid - * value: timeout list for one job - * it's used to cancel task, if task has started, it can't be canceled - */ - private final ConcurrentHashMap> jobTimeoutMap = - new ConcurrentHashMap<>(128); - - /** - * scheduler tasks, it's used to scheduler job - */ - private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, - 660); - - /** - * Producer and Consumer model - * disruptor is used to handle task - * disruptor will start a thread pool to handle task - */ - private final TimerTaskDisruptor disruptor; - - public AsyncJobManager() { - dorisTimer.start(); - this.disruptor = new TimerTaskDisruptor(this); - this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); - batchSchedulerTasks(); - cycleSystemSchedulerTasks(); - } - - public Long registerJob(Job job) { - if (!job.checkJobParam()) { - log.warn("registerJob failed, job: {} param is invalid", job); - return null; - } - if (job.getStartTimestamp() != 0L) { - job.setNextExecuteTimestamp(job.getStartTimestamp() + job.getIntervalMilliSeconds()); - } else { - job.setNextExecuteTimestamp(System.currentTimeMillis() + job.getIntervalMilliSeconds()); - } - - if (job.getNextExecuteTimestamp() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) { - List executeTimestamp = findTasksBetweenTime(job, System.currentTimeMillis(), - BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp, - job.getNextExecuteTimestamp()); - if (!executeTimestamp.isEmpty()) { - for (Long timestamp : executeTimestamp) { - putOneTask(job.getJobId(), timestamp); - } - } - } - - jobMap.putIfAbsent(job.getJobId(), job); - return job.getJobId(); - } - - public void unregisterJob(Long jobId) { - jobMap.remove(jobId); - } - - public boolean pauseJob(Long jobId) { - if (jobMap.get(jobId) == null) { - log.warn("pauseJob failed, jobId: {} not exist", jobId); - return false; - } - cancelJobAllTask(jobId); - jobMap.get(jobId).pause(); - return true; - } - - public boolean resumeJob(Long jobId) { - if (jobMap.get(jobId) == null) { - log.warn("resumeJob failed, jobId: {} not exist", jobId); - return false; - } - jobMap.get(jobId).resume(); - return true; - } - - public boolean stopJob(Long jobId) { - if (jobMap.get(jobId) == null) { - log.warn("stopJob failed, jobId: {} not exist", jobId); - return false; - } - cancelJobAllTask(jobId); - jobMap.get(jobId).stop(); - return true; - } - - public Job getJob(Long jobId) { - return jobMap.get(jobId); - } - - public Map getAllJob() { - return jobMap; - } - - public boolean batchSchedulerTasks() { - executeJobIdsWithinLastTenMinutesWindow(); - return true; - } - - public List findTasksBetweenTime(Job job, Long startTime, Long endTime, Long nextExecuteTime) { - List jobExecuteTimes = new ArrayList<>(); - if (System.currentTimeMillis() < startTime) { - return jobExecuteTimes; - } - while (endTime >= nextExecuteTime) { - if (job.isTaskTimeExceeded()) { - break; - } - jobExecuteTimes.add(nextExecuteTime); - nextExecuteTime = job.getExecuteTimestampAndGeneratorNext(); - } - return jobExecuteTimes; - } - - /** - * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger - */ - private void executeJobIdsWithinLastTenMinutesWindow() { - if (jobMap.isEmpty()) { - return; - } - jobMap.forEach((k, v) -> { - if (v.isRunning() && (v.getNextExecuteTimestamp() + v.getIntervalMilliSeconds() - < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) { - List executeTimes = findTasksBetweenTime(v, lastBatchSchedulerTimestamp, - lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, - v.getNextExecuteTimestamp()); - if (!executeTimes.isEmpty()) { - for (Long executeTime : executeTimes) { - putOneTask(v.getJobId(), executeTime); - } - } - } - }); - this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; - } - - /** - * We will cycle system scheduler tasks every 10 minutes. - * Jobs will be re-registered after the task is completed - */ - private void cycleSystemSchedulerTasks() { - dorisTimer.newTimeout(timeout -> { - batchSchedulerTasks(); - cycleSystemSchedulerTasks(); - }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS); - } - - public void putOneTask(Long jobId, Long startExecuteTime) { - DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, disruptor); - if (isClosed) { - log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId()); - return; - } - long delay = getDelaySecond(task.getStartTimestamp()); - Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); - if (timeout == null) { - log.error("putOneTask failed, jobId: {}", task.getJobId()); - return; - } - if (jobTimeoutMap.containsKey(task.getJobId())) { - jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); - return; - } - Map timeoutMap = new ConcurrentHashMap<>(); - timeoutMap.put(task.getTaskId(), timeout); - jobTimeoutMap.put(task.getJobId(), timeoutMap); - } - - // cancel all task for one job - // if task has started, it can't be canceled - public void cancelJobAllTask(Long jobId) { - if (!jobTimeoutMap.containsKey(jobId)) { - return; - } - - jobTimeoutMap.get(jobId).values().forEach(timeout -> { - if (!timeout.isExpired() || timeout.isCancelled()) { - timeout.cancel(); - } - }); - } - - public void stopTask(Long jobId, Long taskId) { - if (!jobTimeoutMap.containsKey(jobId)) { - return; - } - cancelJobAllTask(jobId); - jobTimeoutMap.get(jobId).remove(taskId); - } - - // get delay time, if startTimestamp is less than now, return 0 - private long getDelaySecond(long startTimestamp) { - long delay = 0; - long now = System.currentTimeMillis(); - if (startTimestamp > now) { - delay = startTimestamp - now; - } else { - log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now); - } - return delay / 1000; - } - - @Override - public void close() throws IOException { - isClosed = true; - dorisTimer.stop(); - disruptor.close(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java index 7522548ad6..eab92b7fb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/DorisTimerTask.java @@ -53,6 +53,6 @@ public class DorisTimerTask implements TimerTask { if (timeout.isCancelled()) { return; } - timerTaskDisruptor.tryPublish(jobId, taskId); + timerTaskDisruptor.tryPublish(jobId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java index 6923e2277f..3d29b0b842 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java @@ -17,12 +17,25 @@ package org.apache.doris.scheduler.job; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.scheduler.common.IntervalUnit; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.constants.JobType; import org.apache.doris.scheduler.executor.JobExecutor; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import lombok.Data; -import java.util.UUID; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; /** * Job is the core of the scheduler module, which is used to store the Job information of the job module. @@ -32,53 +45,90 @@ import java.util.UUID; * job. */ @Data -public class Job { +public class Job implements Writable { - public Job(String jobName, Long intervalMilliSeconds, Long startTimestamp, Long endTimestamp, + public Job(String jobName, Long intervalMilliSeconds, Long startTimeMs, Long endTimeMs, JobExecutor executor) { this.jobName = jobName; this.executor = executor; - this.intervalMilliSeconds = intervalMilliSeconds; - this.startTimestamp = null == startTimestamp ? 0L : startTimestamp; - this.endTimestamp = null == endTimestamp ? 0L : endTimestamp; + this.intervalMs = intervalMilliSeconds; + this.startTimeMs = null == startTimeMs ? 0L : startTimeMs; + this.endTimeMs = null == endTimeMs ? 0L : endTimeMs; + this.jobStatus = JobStatus.RUNNING; + this.jobId = Env.getCurrentEnv().getNextId(); } - private Long jobId = UUID.randomUUID().getMostSignificantBits(); + public Job() { + this.jobId = Env.getCurrentEnv().getNextId(); + } + @SerializedName("jobId") + private Long jobId; + + @SerializedName("jobName") private String jobName; + @SerializedName("dbName") + private String dbName; + /** * The status of the job, which is used to control the execution of the job. * * @see JobStatus */ - private JobStatus jobStatus = JobStatus.RUNNING; + @SerializedName("jobStatus") + private JobStatus jobStatus; /** * The executor of the job. * * @see JobExecutor */ + @SerializedName("executor") private JobExecutor executor; + @SerializedName("user") private String user; + @SerializedName("isCycleJob") + private boolean isCycleJob = false; + + @SerializedName("isStreamingJob") + private boolean isStreamingJob = false; + + @SerializedName("intervalMs") + private Long intervalMs = 0L; + @SerializedName("startTimeMs") + private Long startTimeMs = 0L; + + @SerializedName("endTimeMs") + private Long endTimeMs = 0L; + + @SerializedName("timezone") + private String timezone; + + @SerializedName("jobCategory") + private JobCategory jobCategory; + + + @SerializedName("latestStartExecuteTimeMs") + private Long latestStartExecuteTimeMs = 0L; + @SerializedName("latestCompleteExecuteTimeMs") + private Long latestCompleteExecuteTimeMs = 0L; + + @SerializedName("intervalUnit") + private IntervalUnit intervalUnit; + @SerializedName("originInterval") + private Long originInterval; + @SerializedName("nextExecuteTimeMs") + private Long nextExecuteTimeMs = 0L; + + @SerializedName("comment") + private String comment; + + @SerializedName("errMsg") private String errMsg; - private Long intervalMilliSeconds; - - private Long updateTime; - - private Long nextExecuteTimestamp; - private Long startTimestamp = 0L; - - private Long endTimestamp = 0L; - - private Long firstExecuteTimestamp = 0L; - - private Long latestStartExecuteTimestamp = 0L; - private Long latestCompleteExecuteTimestamp = 0L; - public boolean isRunning() { return jobStatus == JobStatus.RUNNING; } @@ -88,32 +138,32 @@ public class Job { } public boolean isExpired(long nextExecuteTimestamp) { - if (endTimestamp == 0L) { + if (endTimeMs == 0L) { return false; } - return nextExecuteTimestamp > endTimestamp; + return nextExecuteTimestamp > endTimeMs; } public boolean isTaskTimeExceeded() { - if (endTimestamp == 0L) { + if (endTimeMs == 0L) { return false; } - return System.currentTimeMillis() >= endTimestamp || nextExecuteTimestamp > endTimestamp; + return System.currentTimeMillis() >= endTimeMs || nextExecuteTimeMs > endTimeMs; } public boolean isExpired() { - if (endTimestamp == 0L) { + if (endTimeMs == 0L) { return false; } - return System.currentTimeMillis() >= endTimestamp; + return System.currentTimeMillis() >= endTimeMs; } public Long getExecuteTimestampAndGeneratorNext() { - this.latestStartExecuteTimestamp = nextExecuteTimestamp; - // todo The problem of delay should be considered. If it is greater than the ten-minute time window, + this.latestStartExecuteTimeMs = nextExecuteTimeMs; + // todo The problem of delay should be considered. If it is greater than the ten-minute time window, // should the task be lost or executed on a new time window? - this.nextExecuteTimestamp = latestStartExecuteTimestamp + intervalMilliSeconds; - return nextExecuteTimestamp; + this.nextExecuteTimeMs = latestStartExecuteTimeMs + intervalMs; + return nextExecuteTimeMs; } public void pause() { @@ -125,6 +175,10 @@ public class Job { this.errMsg = errMsg; } + public void finish() { + this.jobStatus = JobStatus.FINISHED; + } + public void resume() { this.jobStatus = JobStatus.RUNNING; } @@ -134,15 +188,59 @@ public class Job { } public boolean checkJobParam() { - if (startTimestamp != 0L && startTimestamp < System.currentTimeMillis()) { + if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) { return false; } - if (endTimestamp != 0L && endTimestamp < System.currentTimeMillis()) { + if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) { return false; } - if (intervalMilliSeconds == null || intervalMilliSeconds <= 0L) { + + if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) { + return false; + } + if (null == jobCategory) { return false; } return null != executor; } + + + @Override + public void write(DataOutput out) throws IOException { + String jobData = GsonUtils.GSON.toJson(this); + Text.writeString(out, jobData); + } + + public static Job readFields(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), Job.class); + } + + public List getShowInfo() { + List row = Lists.newArrayList(); + row.add(String.valueOf(jobId)); + row.add(dbName); + row.add(jobName); + row.add(user); + row.add(timezone); + if (isCycleJob) { + row.add(JobType.RECURRING.name()); + } else { + if (isStreamingJob) { + row.add(JobType.STREAMING.name()); + } else { + row.add(JobType.ONE_TIME.name()); + } + } + row.add(isCycleJob ? "null" : TimeUtils.longToTimeString(startTimeMs)); + row.add(isCycleJob ? originInterval.toString() : "null"); + row.add(isCycleJob ? intervalUnit.name() : "null"); + row.add(isCycleJob && startTimeMs > 0 ? TimeUtils.longToTimeString(startTimeMs) : "null"); + row.add(isCycleJob && endTimeMs > 0 ? TimeUtils.longToTimeString(endTimeMs) : "null"); + row.add(jobStatus.name()); + row.add(latestCompleteExecuteTimeMs <= 0L ? "null" : TimeUtils.longToTimeString(latestCompleteExecuteTimeMs)); + row.add(errMsg == null ? "null" : errMsg); + row.add(comment == null ? "null" : comment); + return row; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java new file mode 100644 index 0000000000..254875bb75 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/JobTask.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.job; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +@Data +public class JobTask implements Writable { + + @SerializedName("jobId") + private Long jobId; + @SerializedName("taskId") + private Long taskId; + @SerializedName("startTimeMs") + private Long startTimeMs; + @SerializedName("endTimeMs") + private Long endTimeMs; + @SerializedName("successful") + private Boolean isSuccessful; + @SerializedName("errorMsg") + private String errorMsg; + + public JobTask(Long jobId) { + //it's enough to use nanoTime to identify a task + this.taskId = System.nanoTime(); + this.jobId = jobId; + } + + public List getShowInfo() { + List row = Lists.newArrayList(); + row.add(String.valueOf(jobId)); + row.add(String.valueOf(taskId)); + row.add(TimeUtils.longToTimeString(startTimeMs)); + row.add(null == endTimeMs ? "null" : TimeUtils.longToTimeString(endTimeMs)); + if (endTimeMs == null) { + row.add("RUNNING"); + } else { + row.add(isSuccessful ? "SUCCESS" : "FAILED"); + } + if (null == errorMsg) { + row.add("null"); + } else { + row.add(errorMsg); + } + return row; + } + + @Override + public void write(DataOutput out) throws IOException { + String jobData = GsonUtils.GSON.toJson(this); + Text.writeString(out, jobData); + } + + public static JobTask readFields(DataInput in) throws IOException { + return GsonUtils.GSON.fromJson(Text.readString(in), JobTask.class); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/AsyncJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/AsyncJobManager.java new file mode 100644 index 0000000000..c2d2ab4151 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/AsyncJobManager.java @@ -0,0 +1,481 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.constants.JobStatus; +import org.apache.doris.scheduler.disruptor.TimerTaskDisruptor; +import org.apache.doris.scheduler.job.DorisTimerTask; +import org.apache.doris.scheduler.job.Job; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.io.Closeable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class AsyncJobManager implements Closeable, Writable { + + private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(128); + + private long lastBatchSchedulerTimestamp; + + /** + * batch scheduler interval time + */ + private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 * 1000L; + + private boolean isClosed = false; + + /** + * key: jobid + * value: timeout list for one job + * it's used to cancel task, if task has started, it can't be canceled + */ + private final ConcurrentHashMap> jobTimeoutMap = new ConcurrentHashMap<>(128); + + /** + * scheduler tasks, it's used to scheduler job + */ + private final HashedWheelTimer dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660); + + /** + * Producer and Consumer model + * disruptor is used to handle task + * disruptor will start a thread pool to handle task + */ + private final TimerTaskDisruptor disruptor; + + public AsyncJobManager() { + dorisTimer.start(); + this.disruptor = new TimerTaskDisruptor(this); + this.lastBatchSchedulerTimestamp = System.currentTimeMillis(); + batchSchedulerTasks(); + cycleSystemSchedulerTasks(); + } + + public Long registerJob(Job job) throws DdlException { + if (!job.checkJobParam()) { + throw new DdlException("Job param is invalid, please check time param"); + } + checkIsJobNameUsed(job.getDbName(), job.getJobName(), job.getJobCategory()); + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + Env.getCurrentEnv().getEditLog().logCreateJob(job); + return job.getJobId(); + } + + public void replayCreateJob(Job job) { + jobMap.putIfAbsent(job.getJobId(), job); + initAndSchedulerJob(job); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay create scheduler job").build()); + } + + /** + * Replay update load job. + **/ + public void replayUpdateJob(Job job) { + jobMap.put(job.getJobId(), job); + if (JobStatus.RUNNING.equals(job.getJobStatus())) { + initAndSchedulerJob(job); + } + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay update scheduler job").build()); + } + + public void replayDeleteJob(Job job) { + if (null == jobMap.get(job.getJobId())) { + return; + } + jobMap.remove(job.getJobId()); + log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) + .add("msg", "replay delete scheduler job").build()); + } + + private void checkIsJobNameUsed(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = jobMap.values().stream().filter(job -> job.getJobCategory().equals(jobCategory)) + .filter(job -> job.getDbName().equals(dbName)) + .filter(job -> job.getJobName().equals(jobName)).findFirst(); + if (optionalJob.isPresent()) { + throw new DdlException("Name " + jobName + " already used in db " + dbName); + } + } + + private void initAndSchedulerJob(Job job) { + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + return; + } + + Long currentTimeMs = System.currentTimeMillis(); + Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, job.getStartTimeMs(), + job.getIntervalMs(), job.isCycleJob()); + job.setNextExecuteTimeMs(nextExecuteTimeMs); + if (job.getNextExecuteTimeMs() < BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) { + List executeTimestamp = findTasksBetweenTime(job, + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp, + job.getNextExecuteTimeMs()); + if (!executeTimestamp.isEmpty()) { + for (Long timestamp : executeTimestamp) { + putOneTask(job.getJobId(), timestamp); + } + } + } + } + + private Long findFistExecuteTime(Long currentTimeMs, Long startTimeMs, Long intervalMs, boolean isCycleJob) { + if (startTimeMs != 0L && startTimeMs > currentTimeMs) { + return startTimeMs; + } + // if it's not cycle job and already delay, next execute time is current time + if (!isCycleJob) { + return currentTimeMs; + } + + long cycle = (currentTimeMs - startTimeMs) / intervalMs; + if ((currentTimeMs - startTimeMs) % intervalMs > 0) { + cycle += 1; + } + return startTimeMs + cycle * intervalMs; + } + + public void unregisterJob(Long jobId) { + jobMap.remove(jobId); + } + + public void pauseJob(Long jobId) { + Job job = jobMap.get(jobId); + if (jobMap.get(jobId) == null) { + log.warn("pauseJob failed, jobId: {} not exist", jobId); + } + if (jobMap.get(jobId).getJobStatus().equals(JobStatus.PAUSED)) { + log.warn("pauseJob failed, jobId: {} is already paused", jobId); + } + pauseJob(job); + } + + public void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (job.getJobStatus().equals(JobStatus.STOPPED)) { + throw new DdlException("Job " + jobName + " is already stopped"); + } + stopJob(optionalJob.get()); + Env.getCurrentEnv().getEditLog().logDeleteJob(optionalJob.get()); + } + + private void stopJob(Job job) { + if (JobStatus.RUNNING.equals(job.getJobStatus())) { + cancelJobAllTask(job.getJobId()); + } + job.setJobStatus(JobStatus.STOPPED); + jobMap.get(job.getJobId()).stop(); + Env.getCurrentEnv().getEditLog().logDeleteJob(job); + Env.getCurrentEnv().getJobTaskManager().deleteJobTasks(job.getJobId()); + } + + + public void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (!job.getJobStatus().equals(JobStatus.PAUSED)) { + throw new DdlException("Job " + jobName + " is not paused"); + } + resumeJob(job); + } + + private void resumeJob(Job job) { + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.RUNNING); + jobMap.get(job.getJobId()).resume(); + initAndSchedulerJob(job); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + + public void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException { + Optional optionalJob = findJob(dbName, jobName, jobCategory); + if (!optionalJob.isPresent()) { + throw new DdlException("Job " + jobName + " not exist in db " + dbName); + } + Job job = optionalJob.get(); + if (!job.getJobStatus().equals(JobStatus.RUNNING)) { + throw new DdlException("Job " + jobName + " is not running"); + } + pauseJob(job); + } + + private void pauseJob(Job job) { + cancelJobAllTask(job.getJobId()); + job.setJobStatus(JobStatus.PAUSED); + jobMap.get(job.getJobId()).pause(); + Env.getCurrentEnv().getEditLog().logUpdateJob(job); + } + + private Optional findJob(String dbName, String jobName, JobCategory jobCategory) { + return jobMap.values().stream().filter(job -> checkJobMatch(job, dbName, jobName, jobCategory)).findFirst(); + } + + private boolean checkJobMatch(Job job, String dbName, String jobName, JobCategory jobCategory) { + return job.getDbName().equals(dbName) && job.getJobName().equals(jobName) + && job.getJobCategory().equals(jobCategory); + } + + + public void resumeJob(Long jobId) { + if (jobMap.get(jobId) == null) { + log.warn("resumeJob failed, jobId: {} not exist", jobId); + return; + } + Job job = jobMap.get(jobId); + resumeJob(job); + } + + public void stopJob(Long jobId) { + Job job = jobMap.get(jobId); + if (null == job) { + log.warn("stopJob failed, jobId: {} not exist", jobId); + return; + } + if (job.getJobStatus().equals(JobStatus.STOPPED)) { + log.warn("stopJob failed, jobId: {} is already stopped", jobId); + return; + } + stopJob(job); + } + + public Job getJob(Long jobId) { + return jobMap.get(jobId); + } + + public Map getAllJob() { + return jobMap; + } + + public void batchSchedulerTasks() { + executeJobIdsWithinLastTenMinutesWindow(); + } + + private List findTasksBetweenTime(Job job, Long endTimeEndWindow, Long nextExecuteTime) { + List jobExecuteTimes = new ArrayList<>(); + if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) { + jobExecuteTimes.add(nextExecuteTime); + if (job.isStreamingJob()) { + job.setJobStatus(JobStatus.RUNNING); + } else { + job.setJobStatus(JobStatus.WAITING_FINISH); + } + return jobExecuteTimes; + } + while (endTimeEndWindow >= nextExecuteTime) { + if (job.isTaskTimeExceeded()) { + break; + } + jobExecuteTimes.add(nextExecuteTime); + nextExecuteTime = job.getExecuteTimestampAndGeneratorNext(); + } + return jobExecuteTimes; + } + + /** + * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger + */ + private void executeJobIdsWithinLastTenMinutesWindow() { + if (jobMap.isEmpty()) { + return; + } + jobMap.forEach((k, v) -> { + if (v.isRunning() && (v.getNextExecuteTimeMs() + + v.getIntervalMs() < lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) { + List executeTimes = findTasksBetweenTime( + v, lastBatchSchedulerTimestamp + BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, + v.getNextExecuteTimeMs()); + if (!executeTimes.isEmpty()) { + for (Long executeTime : executeTimes) { + putOneTask(v.getJobId(), executeTime); + } + } + } + }); + this.lastBatchSchedulerTimestamp += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; + } + + /** + * We will cycle system scheduler tasks every 10 minutes. + * Jobs will be re-registered after the task is completed + */ + private void cycleSystemSchedulerTasks() { + dorisTimer.newTimeout(timeout -> { + batchSchedulerTasks(); + cycleSystemSchedulerTasks(); + }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS); + } + + /** + * put one task to time wheel,it's well be trigger after delay milliseconds + * if the scheduler is closed, the task will not be put into the time wheel + * if delay is less than 0, the task will be trigger immediately + * + * @param jobId job id, we will use it to find the job + * @param startExecuteTime the task will be trigger in this time, unit is millisecond,and we will convert it to + * delay seconds, we just can be second precision + */ + public void putOneTask(Long jobId, Long startExecuteTime) { + DorisTimerTask task = new DorisTimerTask(jobId, startExecuteTime, disruptor); + if (isClosed) { + log.info("putOneTask failed, scheduler is closed, jobId: {}", task.getJobId()); + return; + } + long delay = getDelaySecond(task.getStartTimestamp()); + Timeout timeout = dorisTimer.newTimeout(task, delay, TimeUnit.SECONDS); + if (timeout == null) { + log.error("putOneTask failed, jobId: {}", task.getJobId()); + return; + } + if (jobTimeoutMap.containsKey(task.getJobId())) { + jobTimeoutMap.get(task.getJobId()).put(task.getTaskId(), timeout); + return; + } + Map timeoutMap = new ConcurrentHashMap<>(); + timeoutMap.put(task.getTaskId(), timeout); + jobTimeoutMap.put(task.getJobId(), timeoutMap); + } + + // cancel all task for one job + // if task has started, it can't be canceled + public void cancelJobAllTask(Long jobId) { + if (!jobTimeoutMap.containsKey(jobId)) { + return; + } + + jobTimeoutMap.get(jobId).values().forEach(timeout -> { + if (!timeout.isExpired() || timeout.isCancelled()) { + timeout.cancel(); + } + }); + } + + public void stopTask(Long jobId, Long taskId) { + if (!jobTimeoutMap.containsKey(jobId)) { + return; + } + cancelJobAllTask(jobId); + jobTimeoutMap.get(jobId).remove(taskId); + } + + // get delay time, if startTimestamp is less than now, return 0 + private long getDelaySecond(long startTimestamp) { + long delay = 0; + long now = System.currentTimeMillis(); + if (startTimestamp > now) { + delay = startTimestamp - now; + } else { + //if execute time is less than now, return 0,immediately execute + log.warn("startTimestamp is less than now, startTimestamp: {}, now: {}", startTimestamp, now); + return delay; + } + return delay / 1000; + } + + @Override + public void close() throws IOException { + isClosed = true; + dorisTimer.stop(); + disruptor.close(); + } + + /** + * sort by job id + * + * @param dbFullName database name + * @param category job category + * @param matcher job name matcher + */ + public List queryJob(String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { + List jobs = new ArrayList<>(); + jobMap.values().forEach(job -> { + if (matchJob(job, dbFullName, jobName, category, matcher)) { + jobs.add(job); + } + }); + return jobs; + } + + private boolean matchJob(Job job, String dbFullName, String jobName, JobCategory category, PatternMatcher matcher) { + if (StringUtils.isNotBlank(dbFullName) && !job.getDbName().equalsIgnoreCase(dbFullName)) { + return false; + } + if (StringUtils.isNotBlank(jobName) && !job.getJobName().equalsIgnoreCase(jobName)) { + return false; + } + if (category != null && !job.getJobCategory().equals(category)) { + return false; + } + return null == matcher || matcher.match(job.getJobName()); + } + + public void putOneJobToQueen(Long jobId) { + disruptor.tryPublish(jobId); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(jobMap.size()); + for (Job job : jobMap.values()) { + job.write(out); + } + } + + /** + * read job from data input, and init job + * + * @param in data input + * @throws IOException io exception when read data input error + */ + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + Job job = Job.readFields(in); + jobMap.put(job.getJobId(), job); + initAndSchedulerJob(job); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java new file mode 100644 index 0000000000..c22c89e6d5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/JobTaskManager.java @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.manager; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.scheduler.job.JobTask; + +import lombok.extern.slf4j.Slf4j; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +public class JobTaskManager implements Writable { + + private static final Integer TASK_MAX_NUM = Config.scheduler_job_task_max_num; + + private ConcurrentHashMap> jobTaskMap = new ConcurrentHashMap<>(16); + + public void addJobTask(JobTask jobTask) { + ConcurrentLinkedQueue jobTasks = jobTaskMap + .computeIfAbsent(jobTask.getJobId(), k -> new ConcurrentLinkedQueue<>()); + jobTasks.add(jobTask); + if (jobTasks.size() > TASK_MAX_NUM) { + JobTask oldTask = jobTasks.poll(); + Env.getCurrentEnv().getEditLog().logDeleteJobTask(oldTask); + } + Env.getCurrentEnv().getEditLog().logCreateJobTask(jobTask); + } + + public List getJobTasks(Long jobId) { + if (jobTaskMap.containsKey(jobId)) { + ConcurrentLinkedQueue jobTasks = jobTaskMap.get(jobId); + List jobTaskList = new LinkedList<>(jobTasks); + Collections.reverse(jobTaskList); + return jobTaskList; + } + return new ArrayList<>(); + } + + public void replayCreateTask(JobTask task) { + ConcurrentLinkedQueue jobTasks = jobTaskMap + .computeIfAbsent(task.getJobId(), k -> new ConcurrentLinkedQueue<>()); + jobTasks.add(task); + log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId()) + .add("msg", "replay create scheduler task").build()); + } + + public void replayDeleteTask(JobTask task) { + ConcurrentLinkedQueue jobTasks = jobTaskMap.get(task.getJobId()); + if (jobTasks != null) { + jobTasks.remove(task); + } + log.info(new LogBuilder(LogKey.SCHEDULER_TASK, task.getTaskId()) + .add("msg", "replay delete scheduler task").build()); + } + + public void deleteJobTasks(Long jobId) { + ConcurrentLinkedQueue jobTasks = jobTaskMap.get(jobId); + if (jobTasks != null) { + JobTask jobTask = jobTasks.poll(); + log.info(new LogBuilder(LogKey.SCHEDULER_TASK, jobTask.getTaskId()) + .add("msg", "replay delete scheduler task").build()); + } + jobTaskMap.remove(jobId); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(jobTaskMap.size()); + for (Map.Entry> entry : jobTaskMap.entrySet()) { + out.writeLong(entry.getKey()); + out.writeInt(entry.getValue().size()); + for (JobTask jobTask : entry.getValue()) { + jobTask.write(out); + } + } + + } + + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + Long jobId = in.readLong(); + int taskSize = in.readInt(); + ConcurrentLinkedQueue jobTasks = new ConcurrentLinkedQueue<>(); + for (int j = 0; j < taskSize; j++) { + JobTask jobTask = JobTask.readFields(in); + jobTasks.add(jobTask); + } + jobTaskMap.put(jobId, jobTasks); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/MemoryTaskRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/MemoryTaskRegister.java new file mode 100644 index 0000000000..b8fe81d801 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/MemoryTaskRegister.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.registry; + +import org.apache.doris.scheduler.executor.MemoryTaskExecutor; + +/** + * todo + * Support in-memory job registration in the future + */ +public interface MemoryTaskRegister { + + Long registerTask(MemoryTaskExecutor executor); + + void cancelTask(Long taskId); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java similarity index 76% rename from fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java rename to fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java index ebb6b0d590..4ee17bb8df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/JobRegister.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/registry/PersistentJobRegister.java @@ -17,26 +17,31 @@ package org.apache.doris.scheduler.registry; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.PatternMatcher; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.executor.JobExecutor; +import org.apache.doris.scheduler.job.Job; import java.io.IOException; +import java.util.List; /** * This interface provides a contract for registering timed scheduling events. * The implementation should trigger events in a timely manner using a specific algorithm. * The execution of the events may be asynchronous and not guarantee strict timing accuracy. */ -public interface JobRegister { +public interface PersistentJobRegister { /** * Register a job * - * @param name job name,it's not unique - * @param intervalMs job interval, unit: ms - * @param executor job executor @See {@link JobExecutor} + * @param name job name,it's not unique + * @param intervalMs job interval, unit: ms + * @param executor job executor @See {@link JobExecutor} * @return event job id */ - Long registerJob(String name, Long intervalMs, JobExecutor executor); + Long registerJob(String name, Long intervalMs, JobExecutor executor) throws DdlException; /** * Register a job @@ -49,7 +54,7 @@ public interface JobRegister { * @param executor event job executor @See {@link JobExecutor} * @return job id */ - Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor); + Long registerJob(String name, Long intervalMs, Long startTimeStamp, JobExecutor executor) throws DdlException; /** @@ -68,18 +73,21 @@ public interface JobRegister { * @return event job id */ Long registerJob(String name, Long intervalMs, Long startTimeStamp, Long endTimeStamp, - JobExecutor executor); + JobExecutor executor) throws DdlException; /** * if job is running, pause it * pause means event job will not be executed in the next cycle,but current cycle will not be interrupted * we can resume it by {@link #resumeJob(Long)} * - * @param eventId event job id - * if eventId not exist, return false - * @return true if pause success, false if pause failed + * @param jodId job id + * if jobId not exist, return false */ - Boolean pauseJob(Long jodId); + void pauseJob(Long jodId); + + void pauseJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException; + + void resumeJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException; /** * if job is running, stop it @@ -88,17 +96,21 @@ public interface JobRegister { * we will delete stopped event job * * @param jobId event job id - * @return true if stop success, false if stop failed */ - Boolean stopJob(Long jobId); + void stopJob(Long jobId); + + void stopJob(String dbName, String jobName, JobCategory jobCategory) throws DdlException; /** * if job is paused, resume it * * @param jobId job id - * @return true if resume success, false if resume failed */ - Boolean resumeJob(Long jobId); + void resumeJob(Long jobId); + + Long registerJob(Job job) throws DdlException; + + List getJobs(String dbFullName, String jobName, JobCategory jobCategory, PatternMatcher matcher); /** * close job scheduler register diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 62e7c6f339..501ae85311 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -109,6 +109,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("auto_increment", new Integer(SqlParserSymbols.KW_AUTO_INCREMENT)); keywordMap.put("as", new Integer(SqlParserSymbols.KW_AS)); keywordMap.put("asc", new Integer(SqlParserSymbols.KW_ASC)); + keywordMap.put("at", new Integer(SqlParserSymbols.KW_AT)); keywordMap.put("authors", new Integer(SqlParserSymbols.KW_AUTHORS)); keywordMap.put("backend", new Integer(SqlParserSymbols.KW_BACKEND)); keywordMap.put("backends", new Integer(SqlParserSymbols.KW_BACKENDS)); @@ -196,6 +197,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED)); keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION)); keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV)); + keywordMap.put("do", new Integer(SqlParserSymbols.KW_DO)); keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE)); keywordMap.put("drop", new Integer(SqlParserSymbols.KW_DROP)); keywordMap.put("dropp", new Integer(SqlParserSymbols.KW_DROPP)); @@ -206,11 +208,13 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("encryptkey", new Integer(SqlParserSymbols.KW_ENCRYPTKEY)); keywordMap.put("encryptkeys", new Integer(SqlParserSymbols.KW_ENCRYPTKEYS)); keywordMap.put("end", new Integer(SqlParserSymbols.KW_END)); + keywordMap.put("ends", new Integer(SqlParserSymbols.KW_ENDS)); keywordMap.put("engine", new Integer(SqlParserSymbols.KW_ENGINE)); keywordMap.put("engines", new Integer(SqlParserSymbols.KW_ENGINES)); keywordMap.put("enter", new Integer(SqlParserSymbols.KW_ENTER)); keywordMap.put("errors", new Integer(SqlParserSymbols.KW_ERRORS)); keywordMap.put("events", new Integer(SqlParserSymbols.KW_EVENTS)); + keywordMap.put("every", new Integer(SqlParserSymbols.KW_EVERY)); keywordMap.put("except", new Integer(SqlParserSymbols.KW_EXCEPT)); keywordMap.put("exclude", new Integer(SqlParserSymbols.KW_EXCLUDE)); keywordMap.put("exists", new Integer(SqlParserSymbols.KW_EXISTS)); @@ -279,6 +283,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("isnull", new Integer(SqlParserSymbols.KW_ISNULL)); keywordMap.put("isolation", new Integer(SqlParserSymbols.KW_ISOLATION)); keywordMap.put("job", new Integer(SqlParserSymbols.KW_JOB)); + keywordMap.put("jobs", new Integer(SqlParserSymbols.KW_JOBS)); keywordMap.put("join", new Integer(SqlParserSymbols.KW_JOIN)); keywordMap.put("json", new Integer(SqlParserSymbols.KW_JSON)); keywordMap.put("jsonb", new Integer(SqlParserSymbols.KW_JSONB)); @@ -410,6 +415,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("rows", new Integer(SqlParserSymbols.KW_ROWS)); keywordMap.put("s3", new Integer(SqlParserSymbols.KW_S3)); keywordMap.put("schema", new Integer(SqlParserSymbols.KW_SCHEMA)); + keywordMap.put("scheduler", new Integer(SqlParserSymbols.KW_SCHEDULER)); keywordMap.put("schemas", new Integer(SqlParserSymbols.KW_SCHEMAS)); keywordMap.put("second", new Integer(SqlParserSymbols.KW_SECOND)); keywordMap.put("select", new Integer(SqlParserSymbols.KW_SELECT)); @@ -428,11 +434,13 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("sql_block_rule", new Integer(SqlParserSymbols.KW_SQL_BLOCK_RULE)); keywordMap.put("sample", new Integer(SqlParserSymbols.KW_SAMPLE)); keywordMap.put("start", new Integer(SqlParserSymbols.KW_START)); + keywordMap.put("starts", new Integer(SqlParserSymbols.KW_STARTS)); keywordMap.put("stats", new Integer(SqlParserSymbols.KW_STATS)); keywordMap.put("status", new Integer(SqlParserSymbols.KW_STATUS)); keywordMap.put("stop", new Integer(SqlParserSymbols.KW_STOP)); keywordMap.put("storage", new Integer(SqlParserSymbols.KW_STORAGE)); keywordMap.put("stream", new Integer(SqlParserSymbols.KW_STREAM)); + keywordMap.put("streaming", new Integer(SqlParserSymbols.KW_STREAMING)); keywordMap.put("string", new Integer(SqlParserSymbols.KW_STRING)); keywordMap.put("struct", new Integer(SqlParserSymbols.KW_STRUCT)); keywordMap.put("sum", new Integer(SqlParserSymbols.KW_SUM)); @@ -446,6 +454,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("tablet", new Integer(SqlParserSymbols.KW_TABLET)); keywordMap.put("tablets", new Integer(SqlParserSymbols.KW_TABLETS)); keywordMap.put("task", new Integer(SqlParserSymbols.KW_TASK)); + keywordMap.put("tasks", new Integer(SqlParserSymbols.KW_TASKS)); keywordMap.put("temporary", new Integer(SqlParserSymbols.KW_TEMPORARY)); keywordMap.put("terminated", new Integer(SqlParserSymbols.KW_TERMINATED)); keywordMap.put("text", new Integer(SqlParserSymbols.KW_TEXT)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateJobStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateJobStmtTest.java new file mode 100644 index 0000000000..deca01acf4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateJobStmtTest.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.SqlParserUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.StringReader; + +public class CreateJobStmtTest { + + @Test + public void createOnceTimeJobStmt() throws Exception { + String sql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO SELECT * FROM `address` ;"; + CreateJobStmt jobStmt = sqlParse(sql); + System.out.println(jobStmt.getStmt().toSql()); + Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql()); + + String badExecuteSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO selects * from address ;"; + Assertions.assertThrows(AnalysisException.class, () -> { + sqlParse(badExecuteSql); + }); + String badSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" STARTS \"2023-02-15\" DO selects * from address ;"; + Assertions.assertThrows(AnalysisException.class, () -> { + sqlParse(badSql); + }); + } + + private CreateJobStmt sqlParse(String sql) throws Exception { + org.apache.doris.analysis.SqlScanner input = new org.apache.doris.analysis.SqlScanner(new StringReader(sql)); + org.apache.doris.analysis.SqlParser parser = new org.apache.doris.analysis.SqlParser(input); + CreateJobStmt jobStmt = (CreateJobStmt) SqlParserUtils.getStmt(parser, 0); + return jobStmt; + } + + + @Test + public void createCycleJob() throws Exception { + String sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND STARTS \"2023-02-15\" DO SELECT * FROM `address` ;"; + CreateJobStmt jobStmt = sqlParse(sql); + Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql()); + sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND ENDS \"2023-02-15\" DO SELECT * FROM `address` ;"; + jobStmt = sqlParse(sql); + Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql()); + sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND STARTS \"2023-02-15\" ENDS \"2023-02-16\" DO SELECT * FROM `address` ;"; + jobStmt = sqlParse(sql); + Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql()); + sql = "CREATE JOB job1 ON SCHEDULER EVERY 1 SECOND DO SELECT * FROM `address` ;"; + jobStmt = sqlParse(sql); + Assertions.assertEquals("SELECT * FROM `address`", jobStmt.getStmt().toSql()); + String badExecuteSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" DO selects * from address ;"; + Assertions.assertThrows(AnalysisException.class, () -> { + sqlParse(badExecuteSql); + }); + String badSql = "CREATE JOB job1 ON SCHEDULER AT \"2023-02-15\" STARTS \"2023-02-15\" DO selects * from address ;"; + Assertions.assertThrows(AnalysisException.class, () -> { + sqlParse(badSql); + }); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java index dceb8049cd..777d543408 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/AsyncJobManagerTest.java @@ -17,11 +17,17 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.DdlException; +import org.apache.doris.persist.EditLog; +import org.apache.doris.scheduler.constants.JobCategory; import org.apache.doris.scheduler.executor.JobExecutor; -import org.apache.doris.scheduler.job.AsyncJobManager; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.manager.AsyncJobManager; import lombok.extern.slf4j.Slf4j; +import mockit.Expectations; +import mockit.Mocked; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -37,25 +43,42 @@ public class AsyncJobManagerTest { AsyncJobManager asyncJobManager; + @Mocked + EditLog editLog; + private static AtomicInteger testExecuteCount = new AtomicInteger(0); Job job = new Job("test", 6000L, null, null, new TestExecutor()); @BeforeEach public void init() { + job.setCycleJob(true); + job.setJobCategory(JobCategory.COMMON); testExecuteCount.set(0); asyncJobManager = new AsyncJobManager(); } @Test - public void testCycleScheduler() { + public void testCycleScheduler(@Mocked Env env) throws DdlException { + setContext(env); asyncJobManager.registerJob(job); //consider the time of the first execution and give some buffer time Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 3); } + private void setContext(Env env) { + new Expectations() { + { + env.getEditLog(); + result = editLog; + editLog.logCreateJob((Job) any); + } + }; + } + @Test - public void testCycleSchedulerAndStop() { + public void testCycleSchedulerAndStop(@Mocked Env env) throws DdlException { + setContext(env); asyncJobManager.registerJob(job); long startTime = System.currentTimeMillis(); Awaitility.await().atMost(8, TimeUnit.SECONDS).until(() -> testExecuteCount.get() >= 1); @@ -63,27 +86,29 @@ public class AsyncJobManagerTest { //consider the time of the first execution and give some buffer time Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() >= startTime + 13000L); Assertions.assertEquals(1, testExecuteCount.get()); - } @Test - public void testCycleSchedulerWithIncludeStartTimeAndEndTime() { - job.setStartTimestamp(System.currentTimeMillis() + 6000L); + public void testCycleSchedulerWithIncludeStartTimeAndEndTime(@Mocked Env env) throws DdlException { + setContext(env); + job.setStartTimeMs(System.currentTimeMillis() + 6000L); long endTimestamp = System.currentTimeMillis() + 19000L; - job.setEndTimestamp(endTimestamp); + job.setEndTimeMs(endTimestamp); asyncJobManager.registerJob(job); //consider the time of the first execution and give some buffer time Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() >= endTimestamp + 12000L); - Assertions.assertEquals(2, testExecuteCount.get()); + Assertions.assertEquals(3, testExecuteCount.get()); } @Test - public void testCycleSchedulerWithIncludeEndTime() { + public void testCycleSchedulerWithIncludeEndTime(@Mocked Env env) throws DdlException { + setContext(env); long endTimestamp = System.currentTimeMillis() + 13000; - job.setEndTimestamp(endTimestamp); + job.setEndTimeMs(endTimestamp); asyncJobManager.registerJob(job); + //consider the time of the first execution and give some buffer time Awaitility.await().atMost(36, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() >= endTimestamp + 12000L); @@ -91,10 +116,26 @@ public class AsyncJobManagerTest { } @Test - public void testCycleSchedulerWithIncludeStartTime() { + public void testCycleSchedulerWithIncludeStartTime(@Mocked Env env) throws DdlException { + setContext(env); long startTimestamp = System.currentTimeMillis() + 6000L; - job.setStartTimestamp(startTimestamp); + job.setStartTimeMs(startTimestamp); + asyncJobManager.registerJob(job); + //consider the time of the first execution and give some buffer time + Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() + >= startTimestamp + 7000L); + Assertions.assertEquals(2, testExecuteCount.get()); + } + + @Test + public void testOneTimeJob(@Mocked Env env) throws DdlException { + setContext(env); + + long startTimestamp = System.currentTimeMillis() + 3000L; + job.setIntervalMs(0L); + job.setStartTimeMs(startTimestamp); + job.setCycleJob(false); asyncJobManager.registerJob(job); //consider the time of the first execution and give some buffer time Awaitility.await().atMost(14, TimeUnit.SECONDS).until(() -> System.currentTimeMillis() @@ -109,7 +150,7 @@ public class AsyncJobManagerTest { class TestExecutor implements JobExecutor { @Override - public Boolean execute() { + public Boolean execute(Job job) { log.info("test execute count:{}", testExecuteCount.incrementAndGet()); return true; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java new file mode 100644 index 0000000000..ad5f740907 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/JobTest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.scheduler.disruptor; + +import org.apache.doris.scheduler.common.IntervalUnit; +import org.apache.doris.scheduler.constants.JobCategory; +import org.apache.doris.scheduler.executor.SqlJobExecutor; +import org.apache.doris.scheduler.job.Job; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +public class JobTest { + + private static Job job; + + @BeforeAll + public static void init() { + SqlJobExecutor sqlJobExecutor = new SqlJobExecutor("insert into test values(1);"); + job = new Job("insertTest", 1000L, System.currentTimeMillis(), System.currentTimeMillis() + 100000, sqlJobExecutor); + job.setCycleJob(true); + job.setComment("test"); + job.setOriginInterval(10L); + job.setIntervalUnit(IntervalUnit.SECOND); + job.setUser("root"); + job.setDbName("test"); + job.setTimezone("Asia/Shanghai"); + job.setJobCategory(JobCategory.SQL); + } + + @Test + public void testSerialization() throws IOException { + Path path = Paths.get("./scheduler-jobs"); + Files.deleteIfExists(path); + Files.createFile(path); + DataOutputStream dos = new DataOutputStream(Files.newOutputStream(path)); + job.write(dos); + dos.flush(); + dos.close(); + DataInputStream dis = new DataInputStream(Files.newInputStream(path)); + Job readJob = Job.readFields(dis); + Assertions.assertEquals(job.getJobName(), readJob.getJobName()); + Assertions.assertEquals(job.getTimezone(), readJob.getTimezone()); + + } + + @AfterAll + public static void clean() throws IOException { + Path path = Paths.get("./scheduler-jobs"); + Files.deleteIfExists(path); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java index 1630b1f864..3e5ce7f117 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerTaskDisruptorTest.java @@ -18,8 +18,8 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.scheduler.executor.JobExecutor; -import org.apache.doris.scheduler.job.AsyncJobManager; import org.apache.doris.scheduler.job.Job; +import org.apache.doris.scheduler.manager.AsyncJobManager; import mockit.Expectations; import mockit.Injectable; @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.UUID; import java.util.concurrent.TimeUnit; public class TimerTaskDisruptorTest { @@ -56,7 +55,7 @@ public class TimerTaskDisruptorTest { asyncJobManager.getJob(anyLong); result = job; }}; - timerTaskDisruptor.tryPublish(job.getJobId(), UUID.randomUUID().getMostSignificantBits()); + timerTaskDisruptor.tryPublish(job.getJobId()); Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> testEventExecuteFlag); Assertions.assertTrue(testEventExecuteFlag); } @@ -64,7 +63,7 @@ public class TimerTaskDisruptorTest { class TestExecutor implements JobExecutor { @Override - public Boolean execute() { + public Boolean execute(Job job) { testEventExecuteFlag = true; return true; }