From cbf1f8620a4be7117d81edaf0a130d5a5ac19c67 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 6 Dec 2023 10:44:09 +0800 Subject: [PATCH] [Feature](job)support cancel task and fix log invalid (#27703) - Running task can be show and fix cancel fail - When the insert task scheduling cycle is reached, if there are still tasks running, the scheduling of this task will be canceled at this time. - refactor job status changes SQL - Fix timer job window error - Support cancel task --- .../Create/CREATE-JOB.md | 48 ++++++++++-- .../java/org/apache/doris/common/Config.java | 13 +++- fe/fe-core/src/main/cup/sql_parser.cup | 34 ++++---- ...meJobStmt.java => AlterJobStatusStmt.java} | 52 ++++++------- .../doris/analysis/CancelJobTaskStmt.java | 72 +++++++++++++++++ .../apache/doris/analysis/CreateJobStmt.java | 30 +++++++- .../apache/doris/analysis/PauseJobStmt.java | 69 ----------------- .../apache/doris/analysis/StopJobStmt.java | 49 ------------ .../java/org/apache/doris/catalog/Env.java | 2 +- .../apache/doris/job/base/AbstractJob.java | 77 +++++++++++++++---- .../java/org/apache/doris/job/base/Job.java | 21 +++-- .../apache/doris/job/common/IntervalUnit.java | 3 +- .../apache/doris/job/common/TaskStatus.java | 4 +- .../doris/job/disruptor/TimerJobEvent.java | 4 +- .../executor/DefaultTaskExecutorHandler.java | 4 +- .../job/executor/DispatchTaskHandler.java | 22 ++++-- .../job/executor/TimerJobSchedulerTask.java | 14 ++-- .../job/extensions/insert/InsertJob.java | 24 +++--- .../job/extensions/insert/InsertTask.java | 30 +++++++- .../doris/job/extensions/mtmv/MTMVJob.java | 7 +- .../apache/doris/job/manager/JobManager.java | 54 +++++++------ .../manager/TaskDisruptorGroupManager.java | 10 +-- .../doris/job/manager/TaskTokenManager.java | 4 +- .../doris/job/scheduler/JobScheduler.java | 57 +++++++++----- .../apache/doris/job/task/AbstractTask.java | 26 ++++--- .../java/org/apache/doris/job/task/Task.java | 2 +- .../org/apache/doris/mtmv/MTMVJobManager.java | 6 +- .../trees/plans/commands/InsertExecutor.java | 4 + .../commands/InsertIntoTableCommand.java | 3 + .../java/org/apache/doris/qe/DdlExecutor.java | 26 ++----- .../org/apache/doris/qe/ShowExecutor.java | 2 +- .../org/apache/doris/qe/StmtExecutor.java | 3 + .../base/JobExecutionConfigurationTest.java | 14 ++-- regression-test/pipeline/p0/conf/fe.conf | 4 + .../suites/job_p0/test_base_insert_job.groovy | 60 +++++++++++---- 35 files changed, 529 insertions(+), 325 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/analysis/{ResumeJobStmt.java => AlterJobStatusStmt.java} (51%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md index c06d48a4d0..1202bbff42 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-JOB.md @@ -34,12 +34,28 @@ CREATE JOB ### Description Doris Job 是根据既定计划运行的任务,用于在特定时间或指定时间间隔触发预定义的操作,从而帮助我们自动执行一些任务。从功能上来讲,它类似于操作系统上的 -定时任务(如:Linux 中的 cron、Windows 中的计划任务)。但 Doris 的 Job 调度可以精确到秒级。 +定时任务(如:Linux 中的 cron、Windows 中的计划任务)。 -Job 有两种类型:`ONE_TIME` 和 `BATCH`。其中 `ONE_TIME` 类型的 Job 会在指定的时间点触发,它主要用于一次性任务,而 `BATCH` 类型的 Job 会在指定的时间间隔内循环触发。 -主要用于周期性执行的任务。 +Job 有两种类型:`ONE_TIME` 和 `RECURRING`。其中 `ONE_TIME` 类型的 Job 会在指定的时间点触发,它主要用于一次性任务,而 `RECURRING` 类型的 Job 会在指定的时间间隔内循环触发, 此方式主要用于周期性执行的任务。 +`RECURRING` 类型的 Job 可指定开始时间,结束时间,即 `STARTS\ENDS`, 如果不指定开始时间,则默认首次执行时间为当前时间 + 一次调度周期。如果指定结束时间,则 task 执行完成如果达到结束时间(或超过,或下次执行周期会超过结束时间)则更新为FINISHED状态,此时不会再产生 Task。 + +JOB 共4种状态(`RUNNING`,`STOPPED`,`PAUSED`,`FINISHED`,),初始状态为RUNNING,RUNNING状态的JOB会根据既定的调度周期去生成 TASK 执行,Job 执行完成达到结束时间则状态变更为 `FINISHED`. + +RUNNING 状态的JOB 可以被 pause,即暂停,此时不会再生成 Task。 + +PAUSE状态的 JOB 可以通过 RESUME 操作来恢复运行,更改为RUNNING状态。 + +STOP 状态的 JOB 由用户主动触发,此时会 Cancel 正在运行中的作业,然后删除 JOB。 + +Finished 状态的 JOB 会保留在系统中 24 H,24H 后会被删除。 + +JOB 只描述作业信息, 执行会生成 TASK, TASK 状态分为 PENDING,RUNNING,SUCCEESS,FAILED,CANCELED +PENDING 表示到达触发时间了但是等待资源 RUN, 分配到资源后状态变更为RUNNING ,执行成功/失败即变更为 SUCCESS/FAILED. +CANCELED 即取消状态 ,TASK持久化最终状态,即SUCCESS/FAILED,其他状态运行中可以查到,但是如果重启则不可见。TASK只保留最新的 100 条记录。 + +- 目前仅支持 ***ADMIN*** 权限执行此操作。 +- 目前仅支持 ***INSERT 内表*** -目前仅支持 ***ADMIN*** 权限执行此操作。 语法: @@ -59,8 +75,7 @@ schedule: { } interval: - quantity { DAY | HOUR | MINUTE | - WEEK | SECOND } + quantity { WEEK | DAY | HOUR | MINUTE } ``` 一条有效的 Job 语句必须包含以下内容 @@ -88,7 +103,7 @@ SCHEDULER 语句用于定义作业的执行时间,频率以及持续时间, - interval - 用于指定作业执行频率,它可以是天、小时、分钟、秒、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1 MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次,` 1 SECOND` 表示每秒执行一次。 + 用于指定作业执行频率,它可以是天、小时、分钟、周。例如:` 1 DAY` 表示每天执行一次,` 1 HOUR` 表示每小时执行一次,` 1 MINUTE` 表示每分钟执行一次,` 1 WEEK` 表示每周执行一次。 - STARTS timestamp @@ -122,6 +137,25 @@ CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2020-01-01 00:00:00' DO INSERT ```sql CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS '2020-01-01 00:00:00' ENDS '2020-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 create_time >= days_add(now(),-1); ``` +### INSERT JOB +目前仅支持 ***INSERT 内表*** + +### CONFIG + +fe.conf + +- job_dispatch_timer_job_thread_num, 用于分发定时任务的线程数, 默认值2,如果含有大量周期执行任务,可以调大这个参数。 + +- job_dispatch_timer_job_queue_size, 任务堆积时用于存放定时任务的队列大小,默认值 1024. 如果有大量任务同一时间触发,可以调大这个参数。否则会导致队列满,提交任务会进入阻塞状态,从而导致后续任务无法提交。 + +- finished_job_cleanup_threshold_time_hour, 用于清理已完成的任务的时间阈值,单位为小时,默认值为24小时。 + +- job_insert_task_consumer_thread_num = 10;用于执行 Insert 任务的线程数, 值应该大于0,否则默认为5. + +### Best Practice + +合理的进行 Job 的管理,避免大量的 Job 同时触发,导致任务堆积,从而影响系统的正常运行。 +任务的执行间隔应该设置在一个合理的范围,至少应该大于任务执行时间。 ### Keywords diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d6be0fb919..d350a44c9f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1570,7 +1570,7 @@ public class Config extends ConfigBase { */ @ConfField(description = {"用于分发定时任务的线程数", "The number of threads used to dispatch timer job."}) - public static int job_dispatch_timer_job_thread_num = 5; + public static int job_dispatch_timer_job_thread_num = 2; /** * The number of timer jobs that can be queued. @@ -1582,6 +1582,10 @@ public class Config extends ConfigBase { @ConfField(description = {"任务堆积时用于存放定时任务的队列大小", "The number of timer jobs that can be queued."}) public static int job_dispatch_timer_job_queue_size = 1024; + @ConfField(description = {"finished 状态的 job 最长保存时间,超过这个时间将会被删除, 单位:小时", + "The longest time to save the job in finished status, it will be deleted after this time. Unit: hour"}) + public static int finished_job_cleanup_threshold_time_hour = 24; + @ConfField(description = {"用于执行 Insert 任务的线程数,值应该大于0,否则默认为5", "The number of threads used to consume Insert tasks, " + "the value should be greater than 0, if it is <=0, default is 5."}) @@ -1592,6 +1596,13 @@ public class Config extends ConfigBase { + "the value should be greater than 0, if it is <=0, default is 5."}) public static int job_mtmv_task_consumer_thread_num = 10; + /* job test config */ + /** + * If set to true, we will allow the interval unit to be set to second, when creating a recurring job. + */ + @ConfField + public static boolean enable_job_schedule_second_for_test = false; + /*---------------------- JOB CONFIG END------------------------*/ /** * The number of async tasks that can be queued. @See TaskDisruptor diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 6334f80207..bb6c09e1d4 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -688,7 +688,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, create_routine_load_stmt, pause_routine_load_stmt, resume_routine_load_stmt, stop_routine_load_stmt, show_routine_load_stmt, show_routine_load_task_stmt, show_create_routine_load_stmt, show_create_load_stmt, show_create_reporitory_stmt, describe_stmt, alter_stmt, unset_var_stmt, - create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt, + create_job_stmt,pause_job_stmt,resume_job_stmt,stop_job_stmt,show_job_stmt,cancel_job_task_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt, @@ -1157,6 +1157,8 @@ stmt ::= {: RESULT = stmt; :} | pause_job_stmt : stmt {: RESULT = stmt; :} + | cancel_job_task_stmt : stmt + {: RESULT = stmt; :} | show_job_stmt : stmt {: RESULT = stmt; :} | stop_job_stmt : stmt @@ -2589,13 +2591,7 @@ create_job_stmt ::= {: RESULT = endTime; :} - ; -resume_job_stmt ::= - KW_RESUME KW_JOB KW_FOR job_label:jobLabel - {: - RESULT = new ResumeJobStmt(jobLabel); - :} - ; + ; show_job_stmt ::= KW_SHOW KW_JOBS {: @@ -2623,18 +2619,30 @@ show_job_stmt ::= :} ; pause_job_stmt ::= - KW_PAUSE KW_JOB KW_FOR job_label:jobLabel + KW_PAUSE KW_JOB opt_wild_where {: - RESULT = new PauseJobStmt(jobLabel); + RESULT = new AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.PAUSED); :} ; stop_job_stmt ::= - KW_STOP KW_JOB KW_FOR job_label:jobLabel + KW_DROP KW_JOB opt_wild_where {: - RESULT = new StopJobStmt(jobLabel); + RESULT = new AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.STOPPED); :} - ; + ; +resume_job_stmt ::= + KW_RESUME KW_JOB opt_wild_where + {: + RESULT = new AlterJobStatusStmt(parser.where,org.apache.doris.job.common.JobStatus.RUNNING); + :} + ; +cancel_job_task_stmt ::= + KW_CANCEL KW_TASK opt_wild_where + {: + RESULT = new CancelJobTaskStmt(parser.where); + :} + ; // Routine load statement create_routine_load_stmt ::= KW_CREATE KW_ROUTINE KW_LOAD job_label:jobLabel optional_on_ident:tableName diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java similarity index 51% rename from fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java rename to fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java index 725d24f47a..09f135d2fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ResumeJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterJobStatusStmt.java @@ -17,47 +17,47 @@ package org.apache.doris.analysis; -import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.job.common.JobStatus; import com.google.common.base.Strings; +import lombok.Getter; -public class ResumeJobStmt extends DdlStmt { +public class AlterJobStatusStmt extends DdlStmt { - private final LabelName labelName; + private Expr expr; - private String db; + private static final String columnName = "jobName"; - public ResumeJobStmt(LabelName labelName) { - this.labelName = labelName; - } + @Getter + private String jobName; - public boolean isAll() { - return labelName == null; - } + @Getter + private JobStatus jobStatus; - public String getName() { - return labelName.getLabelName(); - } - - public String getDbFullName() { - return db; + public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) { + this.expr = whereClause; + this.jobStatus = jobStatus; } @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); CreateJobStmt.checkAuth(); - if (labelName != null) { - labelName.analyze(analyzer); - db = labelName.getDbName(); - } else { - if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); - } - db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); + String inputCol = ((SlotRef) expr.getChild(0)).getColumnName(); + if (!inputCol.equalsIgnoreCase(columnName)) { + throw new AnalysisException("Current not support " + inputCol); } + if (!(expr.getChild(1) instanceof StringLiteral)) { + throw new AnalysisException("Value must is string"); + } + + String inputValue = expr.getChild(1).getStringValue(); + if (Strings.isNullOrEmpty(inputValue)) { + throw new AnalysisException("Value can't is null"); + } + this.jobName = inputValue; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java new file mode 100644 index 0000000000..ea6febe163 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelJobTaskStmt.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StringLiteral.java +// and modified by Doris + +package org.apache.doris.analysis; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import lombok.Getter; + +public class CancelJobTaskStmt extends DdlStmt { + + @Getter + private String jobName; + + @Getter + private Long taskId; + + private Expr expr; + + private static final String jobNameKey = "jobName"; + + private static final String taskIdKey = "taskId"; + + public CancelJobTaskStmt(Expr whereExpr) { + this.expr = whereExpr; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + CreateJobStmt.checkAuth(); + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (!compoundPredicate.getOp().equals(CompoundPredicate.Operator.AND)) { + throw new AnalysisException("Only allow compound predicate with operator AND"); + } + String jobNameInput = ((SlotRef) compoundPredicate.getChildren().get(0).getChild(0)).getColumnName(); + if (!jobNameKey.equalsIgnoreCase(jobNameInput)) { + throw new AnalysisException("Current not support " + jobNameInput); + } + + if (!(compoundPredicate.getChildren().get(0).getChild(1) instanceof StringLiteral)) { + throw new AnalysisException("JobName value must is string"); + } + this.jobName = compoundPredicate.getChildren().get(0).getChild(1).getStringValue(); + String taskIdInput = ((SlotRef) compoundPredicate.getChildren().get(1).getChild(0)).getColumnName(); + if (!taskIdKey.equalsIgnoreCase(taskIdInput)) { + throw new AnalysisException("Current not support " + taskIdInput); + } + if (!(compoundPredicate.getChildren().get(1).getChild(1) instanceof IntLiteral)) { + throw new AnalysisException("task id value must is large int"); + } + this.taskId = ((IntLiteral) compoundPredicate.getChildren().get(1).getChild(1)).getLongValue(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java index a1c008fb95..1a44ee4f90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateJobStmt.java @@ -19,6 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; @@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.HashSet; +import java.util.Set; /** * syntax: @@ -66,7 +68,7 @@ public class CreateJobStmt extends DdlStmt { private StatementBase doStmt; @Getter - private AbstractJob jobInstance; + private AbstractJob jobInstance; private final LabelName labelName; @@ -83,6 +85,13 @@ public class CreateJobStmt extends DdlStmt { private final String comment; private JobExecuteType executeType; + // exclude job name prefix, which is used by inner job + private final Set excludeJobNamePrefix = new HashSet<>(); + + { + excludeJobNamePrefix.add("inner_mtmv_"); + } + private static final ImmutableSet> supportStmtSuperClass = new ImmutableSet.Builder>().add(InsertStmt.class) .add(UpdateStmt.class).build(); @@ -126,7 +135,15 @@ public class CreateJobStmt extends DdlStmt { timerDefinition.setInterval(interval); } if (null != intervalTimeUnit) { - timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase())); + IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase()); + if (null == intervalUnit) { + throw new AnalysisException("interval time unit can not be " + intervalTimeUnit); + } + if (intervalUnit.equals(IntervalUnit.SECOND) + && !Config.enable_job_schedule_second_for_test) { + throw new AnalysisException("interval time unit can not be second"); + } + timerDefinition.setIntervalUnit(intervalUnit); } if (null != startsTimeStamp) { timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp)); @@ -134,6 +151,7 @@ public class CreateJobStmt extends DdlStmt { if (null != endsTimeStamp) { timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp)); } + checkJobName(labelName.getLabelName()); jobExecutionConfiguration.setTimerDefinition(timerDefinition); job.setJobConfig(jobExecutionConfiguration); @@ -151,6 +169,14 @@ public class CreateJobStmt extends DdlStmt { jobInstance = job; } + private void checkJobName(String jobName) throws AnalysisException { + for (String prefix : excludeJobNamePrefix) { + if (jobName.startsWith(prefix)) { + throw new AnalysisException("job name can not start with " + prefix); + } + } + } + protected static void checkAuth() throws AnalysisException { if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java deleted file mode 100644 index c399fc37cc..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/PauseJobStmt.java +++ /dev/null @@ -1,69 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.cluster.ClusterNamespace; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.UserException; - -import com.google.common.base.Strings; - -/** - * syntax: - * PAUSE JOB FOR [database.]name - * we can pause a job by jobName - * it's only running job can be paused, and it will be paused immediately - * paused job can be resumed by RESUME EVENT FOR jobName - */ -public class PauseJobStmt extends DdlStmt { - - private final LabelName labelName; - private String db; - - public PauseJobStmt(LabelName labelName) { - this.labelName = labelName; - } - - public boolean isAll() { - return labelName == null; - } - - public String getName() { - return labelName.getLabelName(); - } - - public String getDbFullName() { - return db; - } - - @Override - public void analyze(Analyzer analyzer) throws UserException { - super.analyze(analyzer); - CreateJobStmt.checkAuth(); - if (labelName != null) { - labelName.analyze(analyzer); - db = labelName.getDbName(); - } else { - if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); - } - db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb()); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java deleted file mode 100644 index afef3ef01b..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StopJobStmt.java +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.common.UserException; - -/** - * syntax: - * STOP JOB FOR [database.]name - * only run job can be stopped, and stopped job can't be resumed - */ -public class StopJobStmt extends DdlStmt { - - private final LabelName labelName; - - public StopJobStmt(LabelName labelName) { - this.labelName = labelName; - } - - public String getName() { - return labelName.getLabelName(); - } - - public String getDbFullName() { - return labelName.getDbName(); - } - - @Override - public void analyze(Analyzer analyzer) throws UserException { - super.analyze(analyzer); - CreateJobStmt.checkAuth(); - labelName.analyze(analyzer); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 2e5874fed1..61379c1b28 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -360,7 +360,7 @@ public class Env { private MetastoreEventsProcessor metastoreEventsProcessor; private ExportTaskRegister exportTaskRegister; - private JobManager jobManager; + private JobManager, ?> jobManager; private TransientTaskManager transientTaskManager; private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index ca98756f6d..83f02326d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -26,6 +26,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.common.TaskType; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.persist.gson.GsonUtils; @@ -36,6 +37,7 @@ import org.apache.doris.thrift.TRow; import com.google.common.collect.ImmutableList; import com.google.gson.annotations.SerializedName; import lombok.Data; +import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.RandomUtils; @@ -43,10 +45,14 @@ import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; @Data -public abstract class AbstractJob implements Job, Writable { +@Log4j2 +public abstract class AbstractJob implements Job, Writable { @SerializedName(value = "jid") private Long jobId; @@ -75,6 +81,9 @@ public abstract class AbstractJob implements Job, Wri @SerializedName(value = "sql") String executeSql; + @SerializedName(value = "ftm") + private long finishTimeMs; + private List runningTasks = new ArrayList<>(); @Override @@ -106,7 +115,43 @@ public abstract class AbstractJob implements Job, Wri throw new JobException("no running task"); } runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() - .orElseThrow(() -> new JobException("no task id:" + taskId)).cancel(); + .orElseThrow(() -> new JobException("no task id: " + taskId)).cancel(); + if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { + updateJobStatus(JobStatus.FINISHED); + } + } + + public List queryAllTasks() { + List tasks = new ArrayList<>(); + if (CollectionUtils.isEmpty(runningTasks)) { + return queryTasks(); + } + + List historyTasks = queryTasks(); + if (CollectionUtils.isNotEmpty(historyTasks)) { + tasks.addAll(historyTasks); + } + Set loadTaskIds = tasks.stream().map(AbstractTask::getTaskId).collect(Collectors.toSet()); + runningTasks.forEach(task -> { + if (!loadTaskIds.contains(task.getTaskId())) { + tasks.add(task); + } + }); + Comparator taskComparator = Comparator.comparingLong(T::getCreateTimeMs).reversed(); + tasks.sort(taskComparator); + return tasks; + } + + public List commonCreateTasks(TaskType taskType, C taskContext) { + if (!getJobStatus().equals(JobStatus.RUNNING)) { + log.warn("job is not running, job id is {}", jobId); + return new ArrayList<>(); + } + if (!isReadyForScheduling(taskContext)) { + log.info("job is not ready for scheduling, job id is {}", jobId); + return new ArrayList<>(); + } + return createTasks(taskType, taskContext); } public void initTasks(List tasks) { @@ -133,21 +178,26 @@ public abstract class AbstractJob implements Job, Wri checkJobParamsInternal(); } - public void updateJobStatus(JobStatus newJobStatus) { + public void updateJobStatus(JobStatus newJobStatus) throws JobException { if (null == newJobStatus) { throw new IllegalArgumentException("jobStatus cannot be null"); } + String errorMsg = String.format("Can't update job %s status to the %s status", + jobStatus.name(), newJobStatus.name()); if (jobStatus == newJobStatus) { - throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", - jobStatus.name(), this.jobStatus.name())); + throw new IllegalArgumentException(errorMsg); } if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) { - throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", - jobStatus.name(), this.jobStatus.name())); + throw new IllegalArgumentException(errorMsg); } if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) { - throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status", - jobStatus.name(), this.jobStatus.name())); + throw new IllegalArgumentException(errorMsg); + } + if (newJobStatus.equals(JobStatus.FINISHED)) { + this.finishTimeMs = System.currentTimeMillis(); + } + if (JobStatus.PAUSED.equals(newJobStatus)) { + cancelAllTasks(); } jobStatus = newJobStatus; } @@ -157,25 +207,26 @@ public abstract class AbstractJob implements Job, Wri public static AbstractJob readFields(DataInput in) throws IOException { String jsonJob = Text.readString(in); - AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class); + AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class); job.setRunningTasks(new ArrayList<>()); return job; } @Override - public void onTaskFail(T task) { + public void onTaskFail(T task) throws JobException { updateJobStatusIfEnd(); runningTasks.remove(task); } @Override - public void onTaskSuccess(T task) { + public void onTaskSuccess(T task) throws JobException { updateJobStatusIfEnd(); runningTasks.remove(task); } - private void updateJobStatusIfEnd() { + + private void updateJobStatusIfEnd() throws JobException { JobExecuteType executeType = getJobConfig().getExecuteType(); if (executeType.equals(JobExecuteType.MANUAL)) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java index a530ce3b2a..ee352a0f41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java @@ -33,23 +33,29 @@ import java.util.List; * The job status is used to control the execution of the job. * * @param The type of task associated with the job, extending AbstractTask. + * The type of task context associated with the job */ -public interface Job { +public interface Job { /** * Creates a list of tasks of the specified type for this job. + * you can set task context for task, + * eg: insert task, execute sql is insert into table select * from table1 limit ${limit} + * every task context is different, eg: limit 1000, limit 2000,you can set task context to 1000,2000 + * it's used by manual task or streaming task * - * @param taskType The type of tasks to create. + * @param taskType The type of tasks to create. @See TaskType + * @param taskContext The context of tasks to create. * @return A list of tasks. */ - List createTasks(TaskType taskType); + List createTasks(TaskType taskType, C taskContext); /** * Cancels the task with the specified taskId. * * @param taskId The ID of the task to cancel. * @throws JobException If the task is not in the running state, it may have already - * finished and cannot be cancelled. + * finished and cannot be cancelled. */ void cancelTaskById(long taskId) throws JobException; @@ -60,7 +66,7 @@ public interface Job { * * @return True if the job is ready for scheduling, false otherwise. */ - boolean isReadyForScheduling(); + boolean isReadyForScheduling(C taskContext); /** * Retrieves the metadata for the job, which is used to display job information. @@ -103,17 +109,18 @@ public interface Job { * * @param task The failed task. */ - void onTaskFail(T task); + void onTaskFail(T task) throws JobException; /** * Notifies the job when a task execution is successful. * * @param task The successful task. */ - void onTaskSuccess(T task); + void onTaskSuccess(T task) throws JobException; /** * get the job's show info, which is used to sql show the job information + * * @return List job common show info */ List getShowInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java index 4c576e986f..dba324a81c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/IntervalUnit.java @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; public enum IntervalUnit { - SECOND("second", 0L, TimeUnit.SECONDS::toMillis), MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis), HOUR("hour", 0L, TimeUnit.HOURS::toMillis), @@ -57,7 +56,7 @@ public enum IntervalUnit { return Arrays.stream(IntervalUnit.values()) .filter(config -> config.getUnit().equals(name)) .findFirst() - .orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name)); + .orElseThrow(() -> new IllegalArgumentException("Unknown configuration interval " + name)); } public Long getIntervalMs(Long interval) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java index b4040d31e0..4cb401f28d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/TaskStatus.java @@ -19,8 +19,8 @@ package org.apache.doris.job.common; public enum TaskStatus { PENDING, - CANCEL, + CANCELED, RUNNING, SUCCESS, - FAILD; + FAILED; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java index 218fefd041..65654c225f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TimerJobEvent.java @@ -23,12 +23,12 @@ import com.lmax.disruptor.EventFactory; import lombok.Data; @Data -public class TimerJobEvent> { +public class TimerJobEvent { private T job; - public static > EventFactory> factory() { + public static EventFactory> factory() { return TimerJobEvent::new; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java index 0e5911a4e8..a07e248af6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java @@ -21,7 +21,7 @@ import org.apache.doris.job.disruptor.ExecuteTaskEvent; import org.apache.doris.job.task.AbstractTask; import com.lmax.disruptor.WorkHandler; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; /** * DefaultTaskExecutor is an implementation of the TaskExecutor interface. @@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j; * It executes a given AbstractTask by acquiring a semaphore token from the TaskTokenManager * and releasing it after the task execution. */ -@Slf4j +@Log4j2 public class DefaultTaskExecutorHandler implements WorkHandler> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index 852ba13415..cb0a393d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -26,8 +26,8 @@ import org.apache.doris.job.disruptor.TimerJobEvent; import org.apache.doris.job.task.AbstractTask; import com.lmax.disruptor.WorkHandler; -import jline.internal.Log; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; +import org.apache.commons.collections.CollectionUtils; import java.util.List; import java.util.Map; @@ -37,8 +37,8 @@ import java.util.Map; * when job is ready for scheduling and job status is running * we will create task and publish to task disruptor @see DefaultTaskExecutorHandler */ -@Slf4j -public class DispatchTaskHandler> implements WorkHandler> { +@Log4j2 +public class DispatchTaskHandler implements WorkHandler> { private final Map> disruptorMap; @@ -50,19 +50,27 @@ public class DispatchTaskHandler> implements WorkHandle @Override public void onEvent(TimerJobEvent event) { try { + log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(), + event.getJob().getJobName()); if (null == event.getJob()) { log.info("job is null,may be job is deleted, ignore"); return; } - if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) { - List tasks = event.getJob().createTasks(TaskType.SCHEDULED); + if (event.getJob().isReadyForScheduling(null) && event.getJob().getJobStatus() == JobStatus.RUNNING) { + List tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null); + if (CollectionUtils.isEmpty(tasks)) { + log.warn("job is ready for scheduling, but create task is empty, skip scheduler," + + "job id is {}," + " job name is {}", event.getJob().getJobId(), + event.getJob().getJobName()); + return; + } JobType jobType = event.getJob().getJobType(); for (AbstractTask task : tasks) { disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig()); } } } catch (Exception e) { - Log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e); + log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java index 9bd4cbfeae..74efe49beb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java @@ -18,15 +18,15 @@ package org.apache.doris.job.executor; import org.apache.doris.job.base.AbstractJob; +import org.apache.doris.job.common.JobStatus; import org.apache.doris.job.disruptor.TaskDisruptor; import io.netty.util.Timeout; import io.netty.util.TimerTask; -import jline.internal.Log; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; -@Slf4j -public class TimerJobSchedulerTask> implements TimerTask { +@Log4j2 +public class TimerJobSchedulerTask implements TimerTask { private TaskDisruptor dispatchDisruptor; @@ -40,9 +40,13 @@ public class TimerJobSchedulerTask> implements TimerTas @Override public void run(Timeout timeout) { try { + if (!JobStatus.RUNNING.equals(job.getJobStatus())) { + log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); + return; + } dispatchDisruptor.publishEvent(this.job); } catch (Exception e) { - Log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); + log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 619a5c7fde..968f141352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -47,11 +47,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; @Data @Slf4j -public class InsertJob extends AbstractJob { +public class InsertJob extends AbstractJob { public static final ImmutableList SCHEMA = ImmutableList.of( new Column("Id", ScalarType.createStringType()), @@ -87,11 +88,11 @@ public class InsertJob extends AbstractJob { ConcurrentLinkedQueue taskIdList; // max save task num, do we need to config it? - private static final int MAX_SAVE_TASK_NUM = 50; - + private static final int MAX_SAVE_TASK_NUM = 100; @Override - public List createTasks(TaskType taskType) { + public List createTasks(TaskType taskType, Map taskContext) { + //nothing need to do in insert job InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser()); task.setJobId(getJobId()); task.setTaskType(taskType); @@ -123,17 +124,17 @@ public class InsertJob extends AbstractJob { super.cancelTaskById(taskId); } + @Override + public boolean isReadyForScheduling(Map taskContext) { + return CollectionUtils.isEmpty(getRunningTasks()); + } + @Override public void cancelAllTasks() throws JobException { super.cancelAllTasks(); } - @Override - public boolean isReadyForScheduling() { - return true; - } - @Override protected void checkJobParamsInternal() { @@ -162,8 +163,9 @@ public class InsertJob extends AbstractJob { InsertTask task; try { task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); + task.setCreateTimeMs(loadJob.getCreateTimestamp()); } catch (MetaNotFoundException e) { - log.warn("load job not found,job id is {}", loadJob.getId()); + log.warn("load job not found, job id is {}", loadJob.getId()); return; } task.setJobId(getJobId()); @@ -195,7 +197,7 @@ public class InsertJob extends AbstractJob { } @Override - public void onTaskSuccess(InsertTask task) { + public void onTaskSuccess(InsertTask task) throws JobException { super.onTaskSuccess(task); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index eb319f4d5e..69cdfe26b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import lombok.Getter; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import java.util.ArrayList; import java.util.List; @@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * todo implement this later */ -@Slf4j +@Log4j2 public class InsertTask extends AbstractTask { public static final ImmutableList SCHEMA = ImmutableList.of( @@ -144,8 +144,14 @@ public class InsertTask extends AbstractTask { @Override public void run() throws JobException { try { + if (isCanceled.get()) { + log.info("task has been canceled, task id is {}", getTaskId()); + return; + } command.run(ctx, stmtExecutor); } catch (Exception e) { + log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(), + getTaskId(), sql, e); throw new JobException(e); } } @@ -177,7 +183,7 @@ public class InsertTask extends AbstractTask { @Override public List getShowInfo() { if (null == loadJob) { - return new ArrayList<>(); + return getPendingTaskShowInfo(); } List jobInfo = Lists.newArrayList(); // jobId @@ -258,4 +264,22 @@ public class InsertTask extends AbstractTask { return trow; } + // if task not start, load job is null,return pending task show info + private List getPendingTaskShowInfo() { + List datas = new ArrayList<>(); + + datas.add(String.valueOf(getTaskId())); + datas.add(getJobId() + "_" + getTaskId()); + datas.add(getStatus().name()); + datas.add(FeConstants.null_string); + datas.add(FeConstants.null_string); + datas.add(FeConstants.null_string); + datas.add(TimeUtils.longToTimeString(getCreateTimeMs())); + datas.add(FeConstants.null_string); + datas.add(FeConstants.null_string); + datas.add(FeConstants.null_string); + datas.add(userIdentity.getQualifiedUser()); + return datas; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index c00f97cd79..6321679b6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -47,8 +47,9 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; -public class MTMVJob extends AbstractJob { +public class MTMVJob extends AbstractJob { private static final Logger LOG = LogManager.getLogger(MTMVJob.class); private static final ShowResultSetMetaData JOB_META_DATA = ShowResultSetMetaData.builder() @@ -109,7 +110,7 @@ public class MTMVJob extends AbstractJob { } @Override - public List createTasks(TaskType taskType) { + public List createTasks(TaskType taskType, Map taskContext) { MTMVTask task = new MTMVTask(dbId, mtmvId); task.setTaskType(taskType); ArrayList tasks = new ArrayList<>(); @@ -119,7 +120,7 @@ public class MTMVJob extends AbstractJob { } @Override - public boolean isReadyForScheduling() { + public boolean isReadyForScheduling(Map taskContext) { return CollectionUtils.isEmpty(getRunningTasks()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index c7d04cdd28..776af152c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -29,7 +29,7 @@ import org.apache.doris.job.exception.JobException; import org.apache.doris.job.scheduler.JobScheduler; import org.apache.doris.job.task.AbstractTask; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import org.apache.commons.lang3.StringUtils; import java.io.DataInput; @@ -39,8 +39,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -@Slf4j -public class JobManager> implements Writable { +@Log4j2 +public class JobManager, C> implements Writable { private final ConcurrentHashMap jobMap = new ConcurrentHashMap<>(32); @@ -54,9 +54,9 @@ public class JobManager> implements Writable { public void registerJob(T job) throws JobException { job.checkJobParams(); - checkJobNameExist(job.getJobName(), job.getJobType()); + checkJobNameExist(job.getJobName()); if (jobMap.get(job.getJobId()) != null) { - throw new JobException("job id exist,jobId:" + job.getJobId()); + throw new JobException("job id exist, jobId:" + job.getJobId()); } Env.getCurrentEnv().getEditLog().logCreateJob(job); jobMap.put(job.getJobId(), job); @@ -65,9 +65,9 @@ public class JobManager> implements Writable { } - private void checkJobNameExist(String jobName, JobType type) throws JobException { - if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type))) { - throw new JobException("job name exist,jobName:" + jobName); + private void checkJobNameExist(String jobName) throws JobException { + if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) { + throw new JobException("job name exist, jobName:" + jobName); } } @@ -79,13 +79,13 @@ public class JobManager> implements Writable { jobMap.remove(jobId); } - public void unregisterJob(String jobName, JobType jobType) throws JobException { + public void unregisterJob(String jobName) throws JobException { for (T a : jobMap.values()) { - if (a.getJobName().equals(jobName) && a.getJobType().equals(jobType)) { + if (a.getJobName().equals(jobName)) { try { unregisterJob(a.getJobId()); } catch (JobException e) { - throw new JobException("unregister job error,jobName:" + jobName); + throw new JobException("unregister job error, jobName:" + jobName); } } } @@ -98,13 +98,17 @@ public class JobManager> implements Writable { Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId)); } - public void alterJobStatus(String jobName, JobStatus jobStatus, JobType jobType) throws JobException { + public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException { for (T a : jobMap.values()) { - if (a.getJobName().equals(jobName) && jobType.equals(a.getJobType())) { + if (a.getJobName().equals(jobName)) { try { + if (jobStatus.equals(JobStatus.STOPPED)) { + unregisterJob(a.getJobId()); + return; + } alterJobStatus(a.getJobId(), jobStatus); } catch (JobException e) { - throw new JobException("unregister job error,jobName:" + jobName); + throw new JobException("unregister job error, jobName:" + jobName); } } } @@ -112,7 +116,7 @@ public class JobManager> implements Writable { private void checkJobExist(Long jobId) throws JobException { if (null == jobMap.get(jobId)) { - throw new JobException("job not exist,jobId:" + jobId); + throw new JobException("job not exist, jobId:" + jobId); } } @@ -129,6 +133,7 @@ public class JobManager> implements Writable { /** * query jobs by job type + * * @param jobTypes @JobType * @return List job list */ @@ -156,12 +161,12 @@ public class JobManager> implements Writable { public List queryTasks(Long jobId) throws JobException { checkJobExist(jobId); - return jobMap.get(jobId).queryTasks(); + return jobMap.get(jobId).queryAllTasks(); } - public void triggerJob(long jobId) throws JobException { + public void triggerJob(long jobId, C context) throws JobException { checkJobExist(jobId); - jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL); + jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context); } public void replayCreateJob(T job) { @@ -191,11 +196,14 @@ public class JobManager> implements Writable { .add("msg", "replay delete scheduler job").build()); } - void cancelTask(Long jobId, Long taskId) throws JobException { - checkJobExist(jobId); - if (null == jobMap.get(jobId).getRunningTasks()) { - throw new JobException("task not exist,taskId:" + taskId); + public void cancelTaskById(String jobName, Long taskId) throws JobException { + for (T job : jobMap.values()) { + if (job.getJobName().equals(jobName)) { + job.cancelTaskById(taskId); + return; + } } + throw new JobException("job not exist, jobName:" + jobName); } @Override @@ -205,7 +213,7 @@ public class JobManager> implements Writable { try { job.write(out); } catch (IOException e) { - log.error("write job error,jobId:" + jobId, e); + log.error("write job error, jobId:" + jobId, e); } }); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index d07a109fc5..4e31e46701 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -46,7 +46,7 @@ public class TaskDisruptorGroupManager { private final Map> disruptorMap = new EnumMap<>(JobType.class); @Getter - private TaskDisruptor>> dispatchDisruptor; + private TaskDisruptor> dispatchDisruptor; private static final int DEFAULT_RING_BUFFER_SIZE = 1024; @@ -76,14 +76,14 @@ public class TaskDisruptorGroupManager { } private void registerDispatchDisruptor() { - EventFactory>> dispatchEventFactory = TimerJobEvent.factory(); + EventFactory> dispatchEventFactory = TimerJobEvent.factory(); ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task"); WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM]; for (int i = 0; i < DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM; i++) { dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap); } - EventTranslatorVararg>> eventTranslator = - (event, sequence, args) -> event.setJob((AbstractJob) args[0]); + EventTranslatorVararg> eventTranslator = + (event, sequence, args) -> event.setJob((AbstractJob) args[0]); this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE, dispatchThreadFactory, new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); @@ -123,7 +123,7 @@ public class TaskDisruptorGroupManager { disruptorMap.put(JobType.MV, mtmvDisruptor); } - public void dispatchTimerJob(AbstractJob job) { + public void dispatchTimerJob(AbstractJob job) { dispatchDisruptor.publishEvent(job); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java index 51b8b6aeb3..877cb30691 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskTokenManager.java @@ -18,7 +18,7 @@ package org.apache.doris.job.manager; import lombok.experimental.UtilityClass; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore; * It provides a method to acquire a semaphore token for a specific job ID with the given maximum concurrency. * If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map. */ -@Slf4j +@Log4j2 @UtilityClass public class TaskTokenManager { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index c701c9fd1f..47e91d97b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -17,6 +17,8 @@ package org.apache.doris.job.scheduler; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.job.base.AbstractJob; @@ -30,7 +32,7 @@ import org.apache.doris.job.manager.TaskDisruptorGroupManager; import org.apache.doris.job.task.AbstractTask; import io.netty.util.HashedWheelTimer; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; import org.apache.commons.collections.CollectionUtils; import java.io.Closeable; @@ -39,8 +41,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -@Slf4j -public class JobScheduler> implements Closeable { +@Log4j2 +public class JobScheduler, C> implements Closeable { /** * scheduler tasks, it's used to scheduler job @@ -68,6 +70,13 @@ public class JobScheduler> implements Closeable { */ private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L; + /** + * Finished job will be cleared after 24 hours + */ + private static final long FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS = + (Config.finished_job_cleanup_threshold_time_hour > 0 + ? Config.finished_job_cleanup_threshold_time_hour : 24) * 3600 * 1000L; + public void start() { timerTaskScheduler = new HashedWheelTimer(new CustomThreadFactory("timer-task-scheduler"), 1, TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL); @@ -105,20 +114,20 @@ public class JobScheduler> implements Closeable { //manual job will not scheduler if (JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) { if (job.getJobConfig().isImmediate()) { - schedulerInstantJob(job, TaskType.MANUAL); + schedulerInstantJob(job, TaskType.MANUAL, null); } return; } //todo skip streaming job,improve in the future if (JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) { - schedulerInstantJob(job, TaskType.SCHEDULED); + schedulerInstantJob(job, TaskType.SCHEDULED, null); } } //RECURRING job and immediate is true if (job.getJobConfig().isImmediate()) { job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis()); - schedulerInstantJob(job, TaskType.SCHEDULED); + schedulerInstantJob(job, TaskType.SCHEDULED, null); } //if it's timer job and trigger last window already start, we will scheduler it immediately cycleTimerJobScheduler(job); @@ -142,16 +151,11 @@ public class JobScheduler> implements Closeable { } - public void schedulerInstantJob(T job, TaskType taskType) throws JobException { - if (!job.getJobStatus().equals(JobStatus.RUNNING)) { - throw new JobException("job is not running,job id is %d", job.getJobId()); - } - if (!job.isReadyForScheduling()) { - log.info("job is not ready for scheduling,job id is {}", job.getJobId()); - return; - } - List tasks = job.createTasks(taskType); + public void schedulerInstantJob(T job, TaskType taskType, C context) { + List tasks = job.commonCreateTasks(taskType, context); if (CollectionUtils.isEmpty(tasks)) { + log.info("job create task is empty, skip scheduler, job id is {},job name is {}", job.getJobId(), + job.getJobName()); if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { job.setJobStatus(JobStatus.FINISHED); } @@ -166,19 +170,34 @@ public class JobScheduler> implements Closeable { * We will get the task in the next time window, and then hand it over to the time wheel for timing trigger */ private void executeTimerJobIdsWithinLastTenMinutesWindow() { - if (jobMap.isEmpty()) { - return; - } if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) { this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis(); } this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS; + if (jobMap.isEmpty()) { + return; + } for (Map.Entry entry : jobMap.entrySet()) { T job = entry.getValue(); - if (!job.getJobConfig().checkIsTimerJob()) { + if (job.getJobStatus().equals(JobStatus.FINISHED)) { + clearFinishedJob(job); + continue; + } + if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { continue; } cycleTimerJobScheduler(job); } } + + private void clearFinishedJob(T job) { + if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) { + return; + } + try { + Env.getCurrentEnv().getJobManager().unregisterJob(job.getJobId()); + } catch (JobException e) { + log.error("clear finish job error, job id is {}", job.getJobId(), e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index efe38b7013..654ee4fbc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -26,10 +26,10 @@ import org.apache.doris.job.exception.JobException; import com.google.gson.annotations.SerializedName; import lombok.Data; -import lombok.extern.slf4j.Slf4j; +import lombok.extern.log4j.Log4j2; @Data -@Slf4j +@Log4j2 public abstract class AbstractTask implements Task { @SerializedName(value = "jid") @@ -49,9 +49,12 @@ public abstract class AbstractTask implements Task { @SerializedName(value = "tt") private TaskType taskType; + @SerializedName(value = "emg") + private String errMsg; + @Override - public void onFail(String msg) { - status = TaskStatus.FAILD; + public void onFail(String msg) throws JobException { + status = TaskStatus.FAILED; if (!isCallable()) { return; } @@ -60,10 +63,10 @@ public abstract class AbstractTask implements Task { @Override public void onFail() throws JobException { - if (TaskStatus.CANCEL.equals(status)) { + if (TaskStatus.CANCELED.equals(status)) { return; } - status = TaskStatus.FAILD; + status = TaskStatus.FAILED; setFinishTimeMs(System.currentTimeMillis()); if (!isCallable()) { return; @@ -73,7 +76,7 @@ public abstract class AbstractTask implements Task { } private boolean isCallable() { - if (status.equals(TaskStatus.CANCEL)) { + if (status.equals(TaskStatus.CANCELED)) { return false; } if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) { @@ -84,6 +87,9 @@ public abstract class AbstractTask implements Task { @Override public void onSuccess() throws JobException { + if (TaskStatus.CANCELED.equals(status)) { + return; + } status = TaskStatus.SUCCESS; setFinishTimeMs(System.currentTimeMillis()); if (!isCallable()) { @@ -99,7 +105,7 @@ public abstract class AbstractTask implements Task { @Override public void cancel() throws JobException { - status = TaskStatus.CANCEL; + status = TaskStatus.CANCELED; } @Override @@ -115,12 +121,12 @@ public abstract class AbstractTask implements Task { onSuccess(); } catch (Exception e) { onFail(); - log.warn("execute task error, job id is {},task id is {}", jobId, taskId, e); + log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); } } public boolean isCancelled() { - return status.equals(TaskStatus.CANCEL); + return status.equals(TaskStatus.CANCELED); } public String getJobName() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index b13d22ff66..48ecf9c29e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -54,7 +54,7 @@ public interface Task { * * @param msg The error message associated with the failure. */ - void onFail(String msg); + void onFail(String msg) throws JobException; /** * This method is called when the task executes successfully. diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index df1e3dbe07..416f7f764b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -52,6 +52,7 @@ public class MTMVJobManager implements MTMVHookService { /** * create MTMVJob + * * @param mtmv * @throws DdlException */ @@ -112,6 +113,7 @@ public class MTMVJobManager implements MTMVHookService { /** * drop MTMVJob + * * @param mtmv * @throws DdlException */ @@ -142,6 +144,7 @@ public class MTMVJobManager implements MTMVHookService { /** * drop MTMVJob and then create MTMVJob + * * @param mtmv * @param alterMTMV * @throws DdlException @@ -156,6 +159,7 @@ public class MTMVJobManager implements MTMVHookService { /** * trigger MTMVJob + * * @param info * @throws DdlException * @throws MetaNotFoundException @@ -170,7 +174,7 @@ public class MTMVJobManager implements MTMVHookService { throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size()); } try { - Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId()); + Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null); } catch (JobException e) { e.printStackTrace(); throw new DdlException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index c4f286ba5c..9b16497d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -588,4 +588,8 @@ public class InsertExecutor { throw new AnalysisException(e.getMessage(), e); } } + + public Coordinator getCoordinator() { + return coordinator; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 62a8f4d95c..c17f215e75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -162,6 +162,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(), physicalOlapTableSink.isFromNativeInsertStmt()); executor.setProfileType(ProfileType.LOAD); + // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption + // so we need to set this here + executor.setCoord(insertExecutor.getCoordinator()); insertExecutor.executeSingleInsertTransaction(executor, jobId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 5a2bae1ee0..7b41715a42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -37,6 +37,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt; import org.apache.doris.analysis.AlterDatabasePropertyStmt; import org.apache.doris.analysis.AlterDatabaseQuotaStmt; import org.apache.doris.analysis.AlterDatabaseRename; +import org.apache.doris.analysis.AlterJobStatusStmt; import org.apache.doris.analysis.AlterPolicyStmt; import org.apache.doris.analysis.AlterResourceStmt; import org.apache.doris.analysis.AlterRoutineLoadStmt; @@ -51,6 +52,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelBackupStmt; import org.apache.doris.analysis.CancelExportStmt; +import org.apache.doris.analysis.CancelJobTaskStmt; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CleanLabelStmt; import org.apache.doris.analysis.CleanProfileStmt; @@ -95,7 +97,6 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.InstallPluginStmt; import org.apache.doris.analysis.KillAnalysisJobStmt; -import org.apache.doris.analysis.PauseJobStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.PauseSyncJobStmt; import org.apache.doris.analysis.RecoverDbStmt; @@ -106,12 +107,10 @@ import org.apache.doris.analysis.RefreshDbStmt; import org.apache.doris.analysis.RefreshLdapStmt; import org.apache.doris.analysis.RefreshTableStmt; import org.apache.doris.analysis.RestoreStmt; -import org.apache.doris.analysis.ResumeJobStmt; import org.apache.doris.analysis.ResumeRoutineLoadStmt; import org.apache.doris.analysis.ResumeSyncJobStmt; import org.apache.doris.analysis.RevokeStmt; import org.apache.doris.analysis.SetUserPropertyStmt; -import org.apache.doris.analysis.StopJobStmt; import org.apache.doris.analysis.StopRoutineLoadStmt; import org.apache.doris.analysis.StopSyncJobStmt; import org.apache.doris.analysis.SyncStmt; @@ -121,8 +120,6 @@ import org.apache.doris.catalog.EncryptKeyHelper; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.job.common.JobStatus; -import org.apache.doris.job.common.JobType; import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.persist.CleanQueryStatsInfo; import org.apache.doris.statistics.StatisticsRepository; @@ -190,24 +187,17 @@ public class DdlExecutor { } catch (Exception e) { throw new DdlException(e.getMessage()); } - } else if (ddlStmt instanceof StopJobStmt) { - StopJobStmt stmt = (StopJobStmt) ddlStmt; + } else if (ddlStmt instanceof AlterJobStatusStmt) { + AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt; try { - env.getJobManager().unregisterJob(stmt.getName(), JobType.INSERT); + env.getJobManager().alterJobStatus(stmt.getJobName(), stmt.getJobStatus()); } catch (Exception e) { throw new DdlException(e.getMessage()); } - } else if (ddlStmt instanceof PauseJobStmt) { - PauseJobStmt stmt = (PauseJobStmt) ddlStmt; + } else if (ddlStmt instanceof CancelJobTaskStmt) { + CancelJobTaskStmt stmt = (CancelJobTaskStmt) ddlStmt; try { - env.getJobManager().alterJobStatus(stmt.getName(), JobStatus.PAUSED, JobType.INSERT); - } catch (Exception e) { - throw new DdlException(e.getMessage()); - } - } else if (ddlStmt instanceof ResumeJobStmt) { - ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt; - try { - env.getJobManager().alterJobStatus(stmt.getName(), JobStatus.RUNNING, JobType.INSERT); + env.getJobManager().cancelTaskById(stmt.getJobName(), stmt.getTaskId()); } catch (Exception e) { throw new DdlException(e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 430f790ecd..fd8c4896f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -1429,7 +1429,7 @@ public class ShowExecutor { return; } org.apache.doris.job.base.AbstractJob job = jobs.get(0); - List jobTasks = job.queryTasks(); + List jobTasks = job.queryAllTasks(); if (CollectionUtils.isEmpty(jobTasks)) { resultSet = new ShowResultSet(job.getTaskMetaData(), rows); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index dbb9b3bd57..29e7aaa3b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -177,6 +177,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; +import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -214,6 +215,8 @@ public class StmtExecutor { private StatementBase parsedStmt; private Analyzer analyzer; private ProfileType profileType = ProfileType.QUERY; + + @Setter private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; private RedirectStatus redirectStatus = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java index eadc6c567d..87d1430375 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java @@ -48,22 +48,22 @@ public class JobExecutionConfigurationTest { configuration.setExecuteType(JobExecuteType.RECURRING); TimerDefinition timerDefinition = new TimerDefinition(); - timerDefinition.setStartTimeMs(1000L); // Start time set to 1 second in the future + timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second in the future timerDefinition.setInterval(10L); // Interval set to 10 milliseconds - timerDefinition.setIntervalUnit(IntervalUnit.SECOND); + timerDefinition.setIntervalUnit(IntervalUnit.MINUTE); configuration.setTimerDefinition(timerDefinition); List delayTimes = configuration.getTriggerDelayTimes( - 0L, 0L, 11000L); + 0L, 0L, 1100000L); Assertions.assertEquals(2, delayTimes.size()); - Assertions.assertArrayEquals(new Long[]{1L, 11L}, delayTimes.toArray()); + Assertions.assertArrayEquals(new Long[]{100L, 700L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( - 2000L, 0L, 11000L); + 200000L, 0L, 1100000L); Assertions.assertEquals(1, delayTimes.size()); - Assertions.assertArrayEquals(new Long[]{ 9L}, delayTimes.toArray()); + Assertions.assertArrayEquals(new Long[]{ 500L}, delayTimes.toArray()); delayTimes = configuration.getTriggerDelayTimes( - 1001L, 0L, 10000L); + 1001000L, 0L, 1000000L); Assertions.assertEquals(0, delayTimes.size()); } diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index df5890b5d1..fadfe8066b 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -95,3 +95,7 @@ edit_log_roll_num = 1000 history_job_keep_max_second = 300 streaming_label_keep_max_second = 300 label_keep_max_second = 300 + +# job test configurations +#allows the creation of jobs with an interval of second +enable_job_schedule_second_for_test = true diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 76d38460ff..adde94dc6f 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -25,7 +25,7 @@ suite("test_base_insert_job") { def jobName = "insert_recovery_test_base_insert_job" sql """drop table if exists `${tableName}` force""" sql """ - STOP JOB for ${jobName} + DROP JOB where jobname = '${jobName}' """ sql """ @@ -49,7 +49,7 @@ suite("test_base_insert_job") { println jobs assert 3>=jobs.size() >= (2 as Boolean) //at least 2 records, some times 3 records sql """ - STOP JOB for ${jobName} + DROP JOB where jobname = '${jobName}' """ sql """drop table if exists `${tableName}` force """ sql """ @@ -70,42 +70,76 @@ suite("test_base_insert_job") { def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); def startTime= dateTime.format(formatter); + def dataCount = sql """select count(*) from ${tableName}""" + assert dataCount.get(0).get(0) == 0 sql """ - CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19', sleep(1000), 1001); """ - Thread.sleep(2500) - - def datas = sql """select * from ${tableName}""" + Thread.sleep(3000) + + // test cancel task + def datas = sql """show job tasks for ${jobName}""" println datas - //assert datas.size() == 1 + assert datas.size() == 1 + println datas.get(0).get(2) + assert datas.get(0).get(2) == "RUNNING" + def taskId = datas.get(0).get(0) + sql """cancel task where jobName='${jobName}' and taskId= ${taskId}""" + def cancelTask = sql """ show job tasks for ${jobName}""" + println cancelTask + //check task status + assert cancelTask.size() == 1 + assert cancelTask.get(0).get(2) == "CANCELED" + // check table data + def dataCount1 = sql """select count(*) from ${tableName}""" + assert dataCount1.get(0).get(0) == 0 + // check job status + def oncejob=sql """show job for ${jobName} """ + println oncejob + assert oncejob.get(0).get(5) == "FINISHED" + //assert comment + println oncejob.get(0).get(8) + //check comment + assert oncejob.get(0).get(8) == "test for test&68686781jbjbhj//ncsa" + try{ sql """ CREATE JOB ${jobName} ON SCHEDULER at '${startTime}' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("startTimeMs must be greater than current time") } sql """ - STOP JOB for test_one_time_error_starts + DROP JOB where jobname = 'test_one_time_error_starts' """ try{ sql """ CREATE JOB test_one_time_error_starts ON SCHEDULER at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("startTimeMs must be greater than current time") } sql """ - STOP JOB for test_error_starts + DROP JOB where jobname = 'test_error_starts' """ try{ sql """ CREATE JOB test_error_starts ON SCHEDULER every 1 second ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - assert true + assert e.getMessage().contains("end time cannot be less than start time") } - + sql """ + DROP JOB where jobname = 'test_error_starts' + """ + try{ + sql """ + CREATE JOB test_error_starts ON SCHEDULER every 1 years ends '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + } catch (Exception e) { + assert e.getMessage().contains("interval time unit can not be years") + } + }