[Feature](job)support cancel task and fix log invalid (#27703)

- Running task can be show and fix cancel fail
- When the insert task scheduling cycle is reached, if there are still tasks running, the scheduling of this task will be canceled at this time.
- refactor job status changes SQL
- Fix timer job window error
- Support cancel task
This commit is contained in:
Calvin Kirs
2023-12-06 10:44:09 +08:00
committed by GitHub
parent 4c90b459e5
commit cbf1f8620a
35 changed files with 529 additions and 325 deletions

View File

@ -17,47 +17,47 @@
package org.apache.doris.analysis;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.job.common.JobStatus;
import com.google.common.base.Strings;
import lombok.Getter;
public class ResumeJobStmt extends DdlStmt {
public class AlterJobStatusStmt extends DdlStmt {
private final LabelName labelName;
private Expr expr;
private String db;
private static final String columnName = "jobName";
public ResumeJobStmt(LabelName labelName) {
this.labelName = labelName;
}
@Getter
private String jobName;
public boolean isAll() {
return labelName == null;
}
@Getter
private JobStatus jobStatus;
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return db;
public AlterJobStatusStmt(Expr whereClause, JobStatus jobStatus) {
this.expr = whereClause;
this.jobStatus = jobStatus;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!inputCol.equalsIgnoreCase(columnName)) {
throw new AnalysisException("Current not support " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("Value must is string");
}
String inputValue = expr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(inputValue)) {
throw new AnalysisException("Value can't is null");
}
this.jobName = inputValue;
}
}

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StringLiteral.java
// and modified by Doris
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import lombok.Getter;
public class CancelJobTaskStmt extends DdlStmt {
@Getter
private String jobName;
@Getter
private Long taskId;
private Expr expr;
private static final String jobNameKey = "jobName";
private static final String taskIdKey = "taskId";
public CancelJobTaskStmt(Expr whereExpr) {
this.expr = whereExpr;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
if (!compoundPredicate.getOp().equals(CompoundPredicate.Operator.AND)) {
throw new AnalysisException("Only allow compound predicate with operator AND");
}
String jobNameInput = ((SlotRef) compoundPredicate.getChildren().get(0).getChild(0)).getColumnName();
if (!jobNameKey.equalsIgnoreCase(jobNameInput)) {
throw new AnalysisException("Current not support " + jobNameInput);
}
if (!(compoundPredicate.getChildren().get(0).getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("JobName value must is string");
}
this.jobName = compoundPredicate.getChildren().get(0).getChild(1).getStringValue();
String taskIdInput = ((SlotRef) compoundPredicate.getChildren().get(1).getChild(0)).getColumnName();
if (!taskIdKey.equalsIgnoreCase(taskIdInput)) {
throw new AnalysisException("Current not support " + taskIdInput);
}
if (!(compoundPredicate.getChildren().get(1).getChild(1) instanceof IntLiteral)) {
throw new AnalysisException("task id value must is large int");
}
this.taskId = ((IntLiteral) compoundPredicate.getChildren().get(1).getChild(1)).getLongValue();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
@ -39,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashSet;
import java.util.Set;
/**
* syntax:
@ -66,7 +68,7 @@ public class CreateJobStmt extends DdlStmt {
private StatementBase doStmt;
@Getter
private AbstractJob<?> jobInstance;
private AbstractJob jobInstance;
private final LabelName labelName;
@ -83,6 +85,13 @@ public class CreateJobStmt extends DdlStmt {
private final String comment;
private JobExecuteType executeType;
// exclude job name prefix, which is used by inner job
private final Set<String> excludeJobNamePrefix = new HashSet<>();
{
excludeJobNamePrefix.add("inner_mtmv_");
}
private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.add(UpdateStmt.class).build();
@ -126,7 +135,15 @@ public class CreateJobStmt extends DdlStmt {
timerDefinition.setInterval(interval);
}
if (null != intervalTimeUnit) {
timerDefinition.setIntervalUnit(IntervalUnit.valueOf(intervalTimeUnit.toUpperCase()));
IntervalUnit intervalUnit = IntervalUnit.fromString(intervalTimeUnit.toUpperCase());
if (null == intervalUnit) {
throw new AnalysisException("interval time unit can not be " + intervalTimeUnit);
}
if (intervalUnit.equals(IntervalUnit.SECOND)
&& !Config.enable_job_schedule_second_for_test) {
throw new AnalysisException("interval time unit can not be second");
}
timerDefinition.setIntervalUnit(intervalUnit);
}
if (null != startsTimeStamp) {
timerDefinition.setStartTimeMs(TimeUtils.timeStringToLong(startsTimeStamp));
@ -134,6 +151,7 @@ public class CreateJobStmt extends DdlStmt {
if (null != endsTimeStamp) {
timerDefinition.setEndTimeMs(TimeUtils.timeStringToLong(endsTimeStamp));
}
checkJobName(labelName.getLabelName());
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
job.setJobConfig(jobExecutionConfiguration);
@ -151,6 +169,14 @@ public class CreateJobStmt extends DdlStmt {
jobInstance = job;
}
private void checkJobName(String jobName) throws AnalysisException {
for (String prefix : excludeJobNamePrefix) {
if (jobName.startsWith(prefix)) {
throw new AnalysisException("job name can not start with " + prefix);
}
}
}
protected static void checkAuth() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");

View File

@ -1,69 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
/**
* syntax:
* PAUSE JOB FOR [database.]name
* we can pause a job by jobName
* it's only running job can be paused, and it will be paused immediately
* paused job can be resumed by RESUME EVENT FOR jobName
*/
public class PauseJobStmt extends DdlStmt {
private final LabelName labelName;
private String db;
public PauseJobStmt(LabelName labelName) {
this.labelName = labelName;
}
public boolean isAll() {
return labelName == null;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return db;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
}

View File

@ -1,49 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.UserException;
/**
* syntax:
* STOP JOB FOR [database.]name
* only run job can be stopped, and stopped job can't be resumed
*/
public class StopJobStmt extends DdlStmt {
private final LabelName labelName;
public StopJobStmt(LabelName labelName) {
this.labelName = labelName;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return labelName.getDbName();
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
CreateJobStmt.checkAuth();
labelName.analyze(analyzer);
}
}

View File

@ -360,7 +360,7 @@ public class Env {
private MetastoreEventsProcessor metastoreEventsProcessor;
private ExportTaskRegister exportTaskRegister;
private JobManager<? extends AbstractJob> jobManager;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private TransientTaskManager transientTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.persist.gson.GsonUtils;
@ -36,6 +37,7 @@ import org.apache.doris.thrift.TRow;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomUtils;
@ -43,10 +45,14 @@ import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Data
public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Writable {
@Log4j2
public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
@SerializedName(value = "jid")
private Long jobId;
@ -75,6 +81,9 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
@SerializedName(value = "sql")
String executeSql;
@SerializedName(value = "ftm")
private long finishTimeMs;
private List<T> runningTasks = new ArrayList<>();
@Override
@ -106,7 +115,43 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
throw new JobException("no running task");
}
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
.orElseThrow(() -> new JobException("no task id:" + taskId)).cancel();
.orElseThrow(() -> new JobException("no task id: " + taskId)).cancel();
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
updateJobStatus(JobStatus.FINISHED);
}
}
public List<T> queryAllTasks() {
List<T> tasks = new ArrayList<>();
if (CollectionUtils.isEmpty(runningTasks)) {
return queryTasks();
}
List<T> historyTasks = queryTasks();
if (CollectionUtils.isNotEmpty(historyTasks)) {
tasks.addAll(historyTasks);
}
Set<Long> loadTaskIds = tasks.stream().map(AbstractTask::getTaskId).collect(Collectors.toSet());
runningTasks.forEach(task -> {
if (!loadTaskIds.contains(task.getTaskId())) {
tasks.add(task);
}
});
Comparator<T> taskComparator = Comparator.comparingLong(T::getCreateTimeMs).reversed();
tasks.sort(taskComparator);
return tasks;
}
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
if (!getJobStatus().equals(JobStatus.RUNNING)) {
log.warn("job is not running, job id is {}", jobId);
return new ArrayList<>();
}
if (!isReadyForScheduling(taskContext)) {
log.info("job is not ready for scheduling, job id is {}", jobId);
return new ArrayList<>();
}
return createTasks(taskType, taskContext);
}
public void initTasks(List<? extends AbstractTask> tasks) {
@ -133,21 +178,26 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
checkJobParamsInternal();
}
public void updateJobStatus(JobStatus newJobStatus) {
public void updateJobStatus(JobStatus newJobStatus) throws JobException {
if (null == newJobStatus) {
throw new IllegalArgumentException("jobStatus cannot be null");
}
String errorMsg = String.format("Can't update job %s status to the %s status",
jobStatus.name(), newJobStatus.name());
if (jobStatus == newJobStatus) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.RUNNING) && !jobStatus.equals(JobStatus.PAUSED)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.STOPPED) && !jobStatus.equals(JobStatus.RUNNING)) {
throw new IllegalArgumentException(String.format("Can't update job %s status to the %s status",
jobStatus.name(), this.jobStatus.name()));
throw new IllegalArgumentException(errorMsg);
}
if (newJobStatus.equals(JobStatus.FINISHED)) {
this.finishTimeMs = System.currentTimeMillis();
}
if (JobStatus.PAUSED.equals(newJobStatus)) {
cancelAllTasks();
}
jobStatus = newJobStatus;
}
@ -157,25 +207,26 @@ public abstract class AbstractJob<T extends AbstractTask> implements Job<T>, Wri
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob<?> job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.setRunningTasks(new ArrayList<>());
return job;
}
@Override
public void onTaskFail(T task) {
public void onTaskFail(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);
}
@Override
public void onTaskSuccess(T task) {
public void onTaskSuccess(T task) throws JobException {
updateJobStatusIfEnd();
runningTasks.remove(task);
}
private void updateJobStatusIfEnd() {
private void updateJobStatusIfEnd() throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
if (executeType.equals(JobExecuteType.MANUAL)) {
return;

View File

@ -33,23 +33,29 @@ import java.util.List;
* The job status is used to control the execution of the job.
*
* @param <T> The type of task associated with the job, extending AbstractTask.
* <C> The type of task context associated with the job
*/
public interface Job<T extends AbstractTask> {
public interface Job<T extends AbstractTask, C> {
/**
* Creates a list of tasks of the specified type for this job.
* you can set task context for task,
* eg: insert task, execute sql is insert into table select * from table1 limit ${limit}
* every task context is different, eg: limit 1000, limit 2000,you can set task context to 1000,2000
* it's used by manual task or streaming task
*
* @param taskType The type of tasks to create.
* @param taskType The type of tasks to create. @See TaskType
* @param taskContext The context of tasks to create.
* @return A list of tasks.
*/
List<T> createTasks(TaskType taskType);
List<T> createTasks(TaskType taskType, C taskContext);
/**
* Cancels the task with the specified taskId.
*
* @param taskId The ID of the task to cancel.
* @throws JobException If the task is not in the running state, it may have already
* finished and cannot be cancelled.
* finished and cannot be cancelled.
*/
void cancelTaskById(long taskId) throws JobException;
@ -60,7 +66,7 @@ public interface Job<T extends AbstractTask> {
*
* @return True if the job is ready for scheduling, false otherwise.
*/
boolean isReadyForScheduling();
boolean isReadyForScheduling(C taskContext);
/**
* Retrieves the metadata for the job, which is used to display job information.
@ -103,17 +109,18 @@ public interface Job<T extends AbstractTask> {
*
* @param task The failed task.
*/
void onTaskFail(T task);
void onTaskFail(T task) throws JobException;
/**
* Notifies the job when a task execution is successful.
*
* @param task The successful task.
*/
void onTaskSuccess(T task);
void onTaskSuccess(T task) throws JobException;
/**
* get the job's show info, which is used to sql show the job information
*
* @return List<String> job common show info
*/
List<String> getShowInfo();

View File

@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public enum IntervalUnit {
SECOND("second", 0L, TimeUnit.SECONDS::toMillis),
MINUTE("minute", 0L, TimeUnit.MINUTES::toMillis),
HOUR("hour", 0L, TimeUnit.HOURS::toMillis),
@ -57,7 +56,7 @@ public enum IntervalUnit {
return Arrays.stream(IntervalUnit.values())
.filter(config -> config.getUnit().equals(name))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name));
.orElseThrow(() -> new IllegalArgumentException("Unknown configuration interval " + name));
}
public Long getIntervalMs(Long interval) {

View File

@ -19,8 +19,8 @@ package org.apache.doris.job.common;
public enum TaskStatus {
PENDING,
CANCEL,
CANCELED,
RUNNING,
SUCCESS,
FAILD;
FAILED;
}

View File

@ -23,12 +23,12 @@ import com.lmax.disruptor.EventFactory;
import lombok.Data;
@Data
public class TimerJobEvent<T extends AbstractJob<?>> {
public class TimerJobEvent<T extends AbstractJob> {
private T job;
public static <T extends AbstractJob<?>> EventFactory<TimerJobEvent<T>> factory() {
public static <T extends AbstractJob> EventFactory<TimerJobEvent<T>> factory() {
return TimerJobEvent::new;
}

View File

@ -21,7 +21,7 @@ import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
/**
* DefaultTaskExecutor is an implementation of the TaskExecutor interface.
@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
* It executes a given AbstractTask by acquiring a semaphore token from the TaskTokenManager
* and releasing it after the task execution.
*/
@Slf4j
@Log4j2
public class DefaultTaskExecutorHandler<T extends AbstractTask> implements WorkHandler<ExecuteTaskEvent<T>> {

View File

@ -26,8 +26,8 @@ import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.WorkHandler;
import jline.internal.Log;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import java.util.List;
import java.util.Map;
@ -37,8 +37,8 @@ import java.util.Map;
* when job is ready for scheduling and job status is running
* we will create task and publish to task disruptor @see DefaultTaskExecutorHandler
*/
@Slf4j
public class DispatchTaskHandler<T extends AbstractJob<?>> implements WorkHandler<TimerJobEvent<T>> {
@Log4j2
public class DispatchTaskHandler<T extends AbstractJob> implements WorkHandler<TimerJobEvent<T>> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap;
@ -50,19 +50,27 @@ public class DispatchTaskHandler<T extends AbstractJob<?>> implements WorkHandle
@Override
public void onEvent(TimerJobEvent<T> event) {
try {
log.info("dispatch timer job, job id is {}, job name is {}", event.getJob().getJobId(),
event.getJob().getJobName());
if (null == event.getJob()) {
log.info("job is null,may be job is deleted, ignore");
return;
}
if (event.getJob().isReadyForScheduling() && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().createTasks(TaskType.SCHEDULED);
if (event.getJob().isReadyForScheduling(null) && event.getJob().getJobStatus() == JobStatus.RUNNING) {
List<? extends AbstractTask> tasks = event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
if (CollectionUtils.isEmpty(tasks)) {
log.warn("job is ready for scheduling, but create task is empty, skip scheduler,"
+ "job id is {}," + " job name is {}", event.getJob().getJobId(),
event.getJob().getJobName());
return;
}
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
disruptorMap.get(jobType).publishEvent(task, event.getJob().getJobConfig());
}
}
} catch (Exception e) {
Log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e);
log.warn("dispatch timer job error, task id is {}", event.getJob().getJobId(), e);
}
}
}

View File

@ -18,15 +18,15 @@
package org.apache.doris.job.executor;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.disruptor.TaskDisruptor;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import jline.internal.Log;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
@Slf4j
public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements TimerTask {
@Log4j2
public class TimerJobSchedulerTask<T extends AbstractJob> implements TimerTask {
private TaskDisruptor dispatchDisruptor;
@ -40,9 +40,13 @@ public class TimerJobSchedulerTask<T extends AbstractJob<?>> implements TimerTas
@Override
public void run(Timeout timeout) {
try {
if (!JobStatus.RUNNING.equals(job.getJobStatus())) {
log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId());
return;
}
dispatchDisruptor.publishEvent(this.job);
} catch (Exception e) {
Log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e);
}
}
}

View File

@ -47,11 +47,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@Data
@Slf4j
public class InsertJob extends AbstractJob<InsertTask> {
public class InsertJob extends AbstractJob<InsertTask, Map> {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
new Column("Id", ScalarType.createStringType()),
@ -87,11 +88,11 @@ public class InsertJob extends AbstractJob<InsertTask> {
ConcurrentLinkedQueue<Long> taskIdList;
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 50;
private static final int MAX_SAVE_TASK_NUM = 100;
@Override
public List<InsertTask> createTasks(TaskType taskType) {
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
//nothing need to do in insert job
InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
task.setJobId(getJobId());
task.setTaskType(taskType);
@ -123,17 +124,17 @@ public class InsertJob extends AbstractJob<InsertTask> {
super.cancelTaskById(taskId);
}
@Override
public boolean isReadyForScheduling(Map taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
}
@Override
public void cancelAllTasks() throws JobException {
super.cancelAllTasks();
}
@Override
public boolean isReadyForScheduling() {
return true;
}
@Override
protected void checkJobParamsInternal() {
@ -162,8 +163,9 @@ public class InsertJob extends AbstractJob<InsertTask> {
InsertTask task;
try {
task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser());
task.setCreateTimeMs(loadJob.getCreateTimestamp());
} catch (MetaNotFoundException e) {
log.warn("load job not found,job id is {}", loadJob.getId());
log.warn("load job not found, job id is {}", loadJob.getId());
return;
}
task.setJobId(getJobId());
@ -195,7 +197,7 @@ public class InsertJob extends AbstractJob<InsertTask> {
}
@Override
public void onTaskSuccess(InsertTask task) {
public void onTaskSuccess(InsertTask task) throws JobException {
super.onTaskSuccess(task);
}

View File

@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import java.util.ArrayList;
import java.util.List;
@ -52,7 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* todo implement this later
*/
@Slf4j
@Log4j2
public class InsertTask extends AbstractTask {
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
@ -144,8 +144,14 @@ public class InsertTask extends AbstractTask {
@Override
public void run() throws JobException {
try {
if (isCanceled.get()) {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
command.run(ctx, stmtExecutor);
} catch (Exception e) {
log.warn("execute insert task error, job id is {}, task id is {},sql is {}", getJobId(),
getTaskId(), sql, e);
throw new JobException(e);
}
}
@ -177,7 +183,7 @@ public class InsertTask extends AbstractTask {
@Override
public List<String> getShowInfo() {
if (null == loadJob) {
return new ArrayList<>();
return getPendingTaskShowInfo();
}
List<String> jobInfo = Lists.newArrayList();
// jobId
@ -258,4 +264,22 @@ public class InsertTask extends AbstractTask {
return trow;
}
// if task not start, load job is null,return pending task show info
private List<String> getPendingTaskShowInfo() {
List<String> datas = new ArrayList<>();
datas.add(String.valueOf(getTaskId()));
datas.add(getJobId() + "_" + getTaskId());
datas.add(getStatus().name());
datas.add(FeConstants.null_string);
datas.add(FeConstants.null_string);
datas.add(FeConstants.null_string);
datas.add(TimeUtils.longToTimeString(getCreateTimeMs()));
datas.add(FeConstants.null_string);
datas.add(FeConstants.null_string);
datas.add(FeConstants.null_string);
datas.add(userIdentity.getQualifiedUser());
return datas;
}
}

View File

@ -47,8 +47,9 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MTMVJob extends AbstractJob<MTMVTask> {
public class MTMVJob extends AbstractJob<MTMVTask, Map> {
private static final Logger LOG = LogManager.getLogger(MTMVJob.class);
private static final ShowResultSetMetaData JOB_META_DATA =
ShowResultSetMetaData.builder()
@ -109,7 +110,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
}
@Override
public List<MTMVTask> createTasks(TaskType taskType) {
public List<MTMVTask> createTasks(TaskType taskType, Map taskContext) {
MTMVTask task = new MTMVTask(dbId, mtmvId);
task.setTaskType(taskType);
ArrayList<MTMVTask> tasks = new ArrayList<>();
@ -119,7 +120,7 @@ public class MTMVJob extends AbstractJob<MTMVTask> {
}
@Override
public boolean isReadyForScheduling() {
public boolean isReadyForScheduling(Map taskContext) {
return CollectionUtils.isEmpty(getRunningTasks());
}

View File

@ -29,7 +29,7 @@ import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
@ -39,8 +39,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Slf4j
public class JobManager<T extends AbstractJob<?>> implements Writable {
@Log4j2
public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
@ -54,9 +54,9 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
public void registerJob(T job) throws JobException {
job.checkJobParams();
checkJobNameExist(job.getJobName(), job.getJobType());
checkJobNameExist(job.getJobName());
if (jobMap.get(job.getJobId()) != null) {
throw new JobException("job id exist,jobId:" + job.getJobId());
throw new JobException("job id exist, jobId:" + job.getJobId());
}
Env.getCurrentEnv().getEditLog().logCreateJob(job);
jobMap.put(job.getJobId(), job);
@ -65,9 +65,9 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
}
private void checkJobNameExist(String jobName, JobType type) throws JobException {
if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName) && a.getJobType().equals(type))) {
throw new JobException("job name exist,jobName:" + jobName);
private void checkJobNameExist(String jobName) throws JobException {
if (jobMap.values().stream().anyMatch(a -> a.getJobName().equals(jobName))) {
throw new JobException("job name exist, jobName:" + jobName);
}
}
@ -79,13 +79,13 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
jobMap.remove(jobId);
}
public void unregisterJob(String jobName, JobType jobType) throws JobException {
public void unregisterJob(String jobName) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName) && a.getJobType().equals(jobType)) {
if (a.getJobName().equals(jobName)) {
try {
unregisterJob(a.getJobId());
} catch (JobException e) {
throw new JobException("unregister job error,jobName:" + jobName);
throw new JobException("unregister job error, jobName:" + jobName);
}
}
}
@ -98,13 +98,17 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
Env.getCurrentEnv().getEditLog().logUpdateJob(jobMap.get(jobId));
}
public void alterJobStatus(String jobName, JobStatus jobStatus, JobType jobType) throws JobException {
public void alterJobStatus(String jobName, JobStatus jobStatus) throws JobException {
for (T a : jobMap.values()) {
if (a.getJobName().equals(jobName) && jobType.equals(a.getJobType())) {
if (a.getJobName().equals(jobName)) {
try {
if (jobStatus.equals(JobStatus.STOPPED)) {
unregisterJob(a.getJobId());
return;
}
alterJobStatus(a.getJobId(), jobStatus);
} catch (JobException e) {
throw new JobException("unregister job error,jobName:" + jobName);
throw new JobException("unregister job error, jobName:" + jobName);
}
}
}
@ -112,7 +116,7 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
private void checkJobExist(Long jobId) throws JobException {
if (null == jobMap.get(jobId)) {
throw new JobException("job not exist,jobId:" + jobId);
throw new JobException("job not exist, jobId:" + jobId);
}
}
@ -129,6 +133,7 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
/**
* query jobs by job type
*
* @param jobTypes @JobType
* @return List<AbstractJob> job list
*/
@ -156,12 +161,12 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
public List<? extends AbstractTask> queryTasks(Long jobId) throws JobException {
checkJobExist(jobId);
return jobMap.get(jobId).queryTasks();
return jobMap.get(jobId).queryAllTasks();
}
public void triggerJob(long jobId) throws JobException {
public void triggerJob(long jobId, C context) throws JobException {
checkJobExist(jobId);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL);
jobScheduler.schedulerInstantJob(jobMap.get(jobId), TaskType.MANUAL, context);
}
public void replayCreateJob(T job) {
@ -191,11 +196,14 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
.add("msg", "replay delete scheduler job").build());
}
void cancelTask(Long jobId, Long taskId) throws JobException {
checkJobExist(jobId);
if (null == jobMap.get(jobId).getRunningTasks()) {
throw new JobException("task not exist,taskId:" + taskId);
public void cancelTaskById(String jobName, Long taskId) throws JobException {
for (T job : jobMap.values()) {
if (job.getJobName().equals(jobName)) {
job.cancelTaskById(taskId);
return;
}
}
throw new JobException("job not exist, jobName:" + jobName);
}
@Override
@ -205,7 +213,7 @@ public class JobManager<T extends AbstractJob<?>> implements Writable {
try {
job.write(out);
} catch (IOException e) {
log.error("write job error,jobId:" + jobId, e);
log.error("write job error, jobId:" + jobId, e);
}
});
}

View File

@ -46,7 +46,7 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
private final Map<JobType, TaskDisruptor<T>> disruptorMap = new EnumMap<>(JobType.class);
@Getter
private TaskDisruptor<TimerJobEvent<AbstractJob<?>>> dispatchDisruptor;
private TaskDisruptor<TimerJobEvent<AbstractJob>> dispatchDisruptor;
private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
@ -76,14 +76,14 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
}
private void registerDispatchDisruptor() {
EventFactory<TimerJobEvent<AbstractJob<T>>> dispatchEventFactory = TimerJobEvent.factory();
EventFactory<TimerJobEvent<AbstractJob>> dispatchEventFactory = TimerJobEvent.factory();
ThreadFactory dispatchThreadFactory = new CustomThreadFactory("dispatch-task");
WorkHandler[] dispatchTaskExecutorHandlers = new WorkHandler[DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM];
for (int i = 0; i < DISPATCH_TIMER_JOB_CONSUMER_THREAD_NUM; i++) {
dispatchTaskExecutorHandlers[i] = new DispatchTaskHandler(this.disruptorMap);
}
EventTranslatorVararg<TimerJobEvent<AbstractJob<T>>> eventTranslator =
(event, sequence, args) -> event.setJob((AbstractJob<T>) args[0]);
EventTranslatorVararg<TimerJobEvent<AbstractJob>> eventTranslator =
(event, sequence, args) -> event.setJob((AbstractJob) args[0]);
this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE,
dispatchThreadFactory,
new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator);
@ -123,7 +123,7 @@ public class TaskDisruptorGroupManager<T extends AbstractTask> {
disruptorMap.put(JobType.MV, mtmvDisruptor);
}
public void dispatchTimerJob(AbstractJob<T> job) {
public void dispatchTimerJob(AbstractJob job) {
dispatchDisruptor.publishEvent(job);
}

View File

@ -18,7 +18,7 @@
package org.apache.doris.job.manager;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -29,7 +29,7 @@ import java.util.concurrent.Semaphore;
* It provides a method to acquire a semaphore token for a specific job ID with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map.
*/
@Slf4j
@Log4j2
@UtilityClass
public class TaskTokenManager {

View File

@ -17,6 +17,8 @@
package org.apache.doris.job.scheduler;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.base.AbstractJob;
@ -30,7 +32,7 @@ import org.apache.doris.job.manager.TaskDisruptorGroupManager;
import org.apache.doris.job.task.AbstractTask;
import io.netty.util.HashedWheelTimer;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections.CollectionUtils;
import java.io.Closeable;
@ -39,8 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
@Log4j2
public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
/**
* scheduler tasks, it's used to scheduler job
@ -68,6 +70,13 @@ public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
*/
private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
/**
* Finished job will be cleared after 24 hours
*/
private static final long FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS =
(Config.finished_job_cleanup_threshold_time_hour > 0
? Config.finished_job_cleanup_threshold_time_hour : 24) * 3600 * 1000L;
public void start() {
timerTaskScheduler = new HashedWheelTimer(new CustomThreadFactory("timer-task-scheduler"), 1,
TimeUnit.SECONDS, HASHED_WHEEL_TIMER_TICKS_PER_WHEEL);
@ -105,20 +114,20 @@ public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
//manual job will not scheduler
if (JobExecuteType.MANUAL.equals(job.getJobConfig().getExecuteType())) {
if (job.getJobConfig().isImmediate()) {
schedulerInstantJob(job, TaskType.MANUAL);
schedulerInstantJob(job, TaskType.MANUAL, null);
}
return;
}
//todo skip streaming job,improve in the future
if (JobExecuteType.INSTANT.equals(job.getJobConfig().getExecuteType())) {
schedulerInstantJob(job, TaskType.SCHEDULED);
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
}
//RECURRING job and immediate is true
if (job.getJobConfig().isImmediate()) {
job.getJobConfig().getTimerDefinition().setLatestSchedulerTimeMs(System.currentTimeMillis());
schedulerInstantJob(job, TaskType.SCHEDULED);
schedulerInstantJob(job, TaskType.SCHEDULED, null);
}
//if it's timer job and trigger last window already start, we will scheduler it immediately
cycleTimerJobScheduler(job);
@ -142,16 +151,11 @@ public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
}
public void schedulerInstantJob(T job, TaskType taskType) throws JobException {
if (!job.getJobStatus().equals(JobStatus.RUNNING)) {
throw new JobException("job is not running,job id is %d", job.getJobId());
}
if (!job.isReadyForScheduling()) {
log.info("job is not ready for scheduling,job id is {}", job.getJobId());
return;
}
List<? extends AbstractTask> tasks = job.createTasks(taskType);
public void schedulerInstantJob(T job, TaskType taskType, C context) {
List<? extends AbstractTask> tasks = job.commonCreateTasks(taskType, context);
if (CollectionUtils.isEmpty(tasks)) {
log.info("job create task is empty, skip scheduler, job id is {},job name is {}", job.getJobId(),
job.getJobName());
if (job.getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
job.setJobStatus(JobStatus.FINISHED);
}
@ -166,19 +170,34 @@ public class JobScheduler<T extends AbstractJob<?>> implements Closeable {
* We will get the task in the next time window, and then hand it over to the time wheel for timing trigger
*/
private void executeTimerJobIdsWithinLastTenMinutesWindow() {
if (jobMap.isEmpty()) {
return;
}
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
this.latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
}
this.latestBatchSchedulerTimerTaskTimeMs += BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
return;
}
for (Map.Entry<Long, T> entry : jobMap.entrySet()) {
T job = entry.getValue();
if (!job.getJobConfig().checkIsTimerJob()) {
if (job.getJobStatus().equals(JobStatus.FINISHED)) {
clearFinishedJob(job);
continue;
}
if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) {
continue;
}
cycleTimerJobScheduler(job);
}
}
private void clearFinishedJob(T job) {
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) {
return;
}
try {
Env.getCurrentEnv().getJobManager().unregisterJob(job.getJobId());
} catch (JobException e) {
log.error("clear finish job error, job id is {}", job.getJobId(), e);
}
}
}

View File

@ -26,10 +26,10 @@ import org.apache.doris.job.exception.JobException;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.extern.log4j.Log4j2;
@Data
@Slf4j
@Log4j2
public abstract class AbstractTask implements Task {
@SerializedName(value = "jid")
@ -49,9 +49,12 @@ public abstract class AbstractTask implements Task {
@SerializedName(value = "tt")
private TaskType taskType;
@SerializedName(value = "emg")
private String errMsg;
@Override
public void onFail(String msg) {
status = TaskStatus.FAILD;
public void onFail(String msg) throws JobException {
status = TaskStatus.FAILED;
if (!isCallable()) {
return;
}
@ -60,10 +63,10 @@ public abstract class AbstractTask implements Task {
@Override
public void onFail() throws JobException {
if (TaskStatus.CANCEL.equals(status)) {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
status = TaskStatus.FAILD;
status = TaskStatus.FAILED;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
return;
@ -73,7 +76,7 @@ public abstract class AbstractTask implements Task {
}
private boolean isCallable() {
if (status.equals(TaskStatus.CANCEL)) {
if (status.equals(TaskStatus.CANCELED)) {
return false;
}
if (null != Env.getCurrentEnv().getJobManager().getJob(jobId)) {
@ -84,6 +87,9 @@ public abstract class AbstractTask implements Task {
@Override
public void onSuccess() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
return;
}
status = TaskStatus.SUCCESS;
setFinishTimeMs(System.currentTimeMillis());
if (!isCallable()) {
@ -99,7 +105,7 @@ public abstract class AbstractTask implements Task {
@Override
public void cancel() throws JobException {
status = TaskStatus.CANCEL;
status = TaskStatus.CANCELED;
}
@Override
@ -115,12 +121,12 @@ public abstract class AbstractTask implements Task {
onSuccess();
} catch (Exception e) {
onFail();
log.warn("execute task error, job id is {},task id is {}", jobId, taskId, e);
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
}
}
public boolean isCancelled() {
return status.equals(TaskStatus.CANCEL);
return status.equals(TaskStatus.CANCELED);
}
public String getJobName() {

View File

@ -54,7 +54,7 @@ public interface Task {
*
* @param msg The error message associated with the failure.
*/
void onFail(String msg);
void onFail(String msg) throws JobException;
/**
* This method is called when the task executes successfully.

View File

@ -52,6 +52,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* create MTMVJob
*
* @param mtmv
* @throws DdlException
*/
@ -112,6 +113,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* drop MTMVJob
*
* @param mtmv
* @throws DdlException
*/
@ -142,6 +144,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* drop MTMVJob and then create MTMVJob
*
* @param mtmv
* @param alterMTMV
* @throws DdlException
@ -156,6 +159,7 @@ public class MTMVJobManager implements MTMVHookService {
/**
* trigger MTMVJob
*
* @param info
* @throws DdlException
* @throws MetaNotFoundException
@ -170,7 +174,7 @@ public class MTMVJobManager implements MTMVHookService {
throw new DdlException("jobs not normal,should have one job,but job num is: " + jobs.size());
}
try {
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId());
Env.getCurrentEnv().getJobManager().triggerJob(jobs.get(0).getJobId(), null);
} catch (JobException e) {
e.printStackTrace();
throw new DdlException(e.getMessage());

View File

@ -588,4 +588,8 @@ public class InsertExecutor {
throw new AnalysisException(e.getMessage(), e);
}
}
public Coordinator getCoordinator() {
return coordinator;
}
}

View File

@ -162,6 +162,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(),
physicalOlapTableSink.isFromNativeInsertStmt());
executor.setProfileType(ProfileType.LOAD);
// We exposed @StmtExecutor#cancel as a unified entry point for statement interruption
// so we need to set this here
executor.setCoord(insertExecutor.getCoordinator());
insertExecutor.executeSingleInsertTransaction(executor, jobId);
}

View File

@ -37,6 +37,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterJobStatusStmt;
import org.apache.doris.analysis.AlterPolicyStmt;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
@ -51,6 +52,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.CancelExportStmt;
import org.apache.doris.analysis.CancelJobTaskStmt;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CleanProfileStmt;
@ -95,7 +97,6 @@ import org.apache.doris.analysis.DropWorkloadGroupStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.PauseJobStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.RecoverDbStmt;
@ -106,12 +107,10 @@ import org.apache.doris.analysis.RefreshDbStmt;
import org.apache.doris.analysis.RefreshLdapStmt;
import org.apache.doris.analysis.RefreshTableStmt;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.ResumeJobStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.RevokeStmt;
import org.apache.doris.analysis.SetUserPropertyStmt;
import org.apache.doris.analysis.StopJobStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.analysis.SyncStmt;
@ -121,8 +120,6 @@ import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.statistics.StatisticsRepository;
@ -190,24 +187,17 @@ public class DdlExecutor {
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
} else if (ddlStmt instanceof StopJobStmt) {
StopJobStmt stmt = (StopJobStmt) ddlStmt;
} else if (ddlStmt instanceof AlterJobStatusStmt) {
AlterJobStatusStmt stmt = (AlterJobStatusStmt) ddlStmt;
try {
env.getJobManager().unregisterJob(stmt.getName(), JobType.INSERT);
env.getJobManager().alterJobStatus(stmt.getJobName(), stmt.getJobStatus());
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
} else if (ddlStmt instanceof PauseJobStmt) {
PauseJobStmt stmt = (PauseJobStmt) ddlStmt;
} else if (ddlStmt instanceof CancelJobTaskStmt) {
CancelJobTaskStmt stmt = (CancelJobTaskStmt) ddlStmt;
try {
env.getJobManager().alterJobStatus(stmt.getName(), JobStatus.PAUSED, JobType.INSERT);
} catch (Exception e) {
throw new DdlException(e.getMessage());
}
} else if (ddlStmt instanceof ResumeJobStmt) {
ResumeJobStmt stmt = (ResumeJobStmt) ddlStmt;
try {
env.getJobManager().alterJobStatus(stmt.getName(), JobStatus.RUNNING, JobType.INSERT);
env.getJobManager().cancelTaskById(stmt.getJobName(), stmt.getTaskId());
} catch (Exception e) {
throw new DdlException(e.getMessage());
}

View File

@ -1429,7 +1429,7 @@ public class ShowExecutor {
return;
}
org.apache.doris.job.base.AbstractJob job = jobs.get(0);
List<AbstractTask> jobTasks = job.queryTasks();
List<AbstractTask> jobTasks = job.queryAllTasks();
if (CollectionUtils.isEmpty(jobTasks)) {
resultSet = new ShowResultSet(job.getTaskMetaData(), rows);
return;

View File

@ -177,6 +177,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -214,6 +215,8 @@ public class StmtExecutor {
private StatementBase parsedStmt;
private Analyzer analyzer;
private ProfileType profileType = ProfileType.QUERY;
@Setter
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
private RedirectStatus redirectStatus = null;