pick: https://github.com/apache/doris/pull/45995
This commit is contained in:
@ -149,12 +149,12 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
private Lock createTaskLock = new ReentrantLock();
|
||||
|
||||
@Override
|
||||
public void cancelAllTasks() throws JobException {
|
||||
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
|
||||
if (CollectionUtils.isEmpty(runningTasks)) {
|
||||
return;
|
||||
}
|
||||
for (T task : runningTasks) {
|
||||
task.cancel();
|
||||
task.cancel(needWaitCancelComplete);
|
||||
canceledTaskCount.incrementAndGet();
|
||||
}
|
||||
runningTasks = new CopyOnWriteArrayList<>();
|
||||
@ -184,7 +184,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
throw new JobException("no running task");
|
||||
}
|
||||
runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst()
|
||||
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel();
|
||||
.orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true);
|
||||
runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
|
||||
canceledTaskCount.incrementAndGet();
|
||||
if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
|
||||
@ -292,7 +292,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
this.finishTimeMs = System.currentTimeMillis();
|
||||
}
|
||||
if (JobStatus.PAUSED.equals(newJobStatus) || JobStatus.STOPPED.equals(newJobStatus)) {
|
||||
cancelAllTasks();
|
||||
cancelAllTasks(JobStatus.STOPPED.equals(newJobStatus) ? false : true);
|
||||
}
|
||||
jobStatus = newJobStatus;
|
||||
}
|
||||
|
||||
@ -101,7 +101,7 @@ public interface Job<T extends AbstractTask, C> {
|
||||
* Cancels all running tasks of this job.
|
||||
* @throws JobException If cancelling a running task fails.
|
||||
*/
|
||||
void cancelAllTasks() throws JobException;
|
||||
void cancelAllTasks(boolean needWaitCancelComplete) throws JobException;
|
||||
|
||||
/**
|
||||
* register job
|
||||
|
||||
@ -66,7 +66,7 @@ public class DispatchTaskHandler<T extends AbstractJob> implements WorkHandler<T
|
||||
JobType jobType = event.getJob().getJobType();
|
||||
for (AbstractTask task : tasks) {
|
||||
if (!disruptorMap.get(jobType).addTask(task)) {
|
||||
task.cancel();
|
||||
task.cancel(true);
|
||||
continue;
|
||||
}
|
||||
log.info("dispatch timer job success, job id is {}, task id is {}",
|
||||
|
||||
@ -297,12 +297,12 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelAllTasks() throws JobException {
|
||||
public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException {
|
||||
try {
|
||||
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
|
||||
checkAuth("CANCEL LOAD");
|
||||
}
|
||||
super.cancelAllTasks();
|
||||
super.cancelAllTasks(needWaitCancelComplete);
|
||||
this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel");
|
||||
} catch (DdlException e) {
|
||||
throw new JobException(e);
|
||||
|
||||
@ -223,7 +223,7 @@ public class InsertTask extends AbstractTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void executeCancelLogic() {
|
||||
protected void executeCancelLogic(boolean needWaitCancelComplete) {
|
||||
if (isFinished.get() || isCanceled.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -276,10 +276,10 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void executeCancelLogic() {
|
||||
protected synchronized void executeCancelLogic(boolean needWaitCancelComplete) {
|
||||
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
|
||||
if (executor != null) {
|
||||
executor.cancel("mtmv task cancelled");
|
||||
executor.cancel("mtmv task cancelled", needWaitCancelComplete);
|
||||
}
|
||||
after();
|
||||
}
|
||||
|
||||
@ -129,16 +129,16 @@ public abstract class AbstractTask implements Task {
|
||||
/**
|
||||
* Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources.
|
||||
* This method encapsulates the core cancellation logic, calling the abstract method
|
||||
* {@link #executeCancelLogic()} for task-specific actions.
|
||||
* {@link #executeCancelLogic(boolean)} for task-specific actions.
|
||||
*
|
||||
* @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping
|
||||
* the original exception.
|
||||
*/
|
||||
@Override
|
||||
public void cancel() throws JobException {
|
||||
public void cancel(boolean needWaitCancelComplete) throws JobException {
|
||||
try {
|
||||
status = TaskStatus.CANCELED;
|
||||
executeCancelLogic();
|
||||
executeCancelLogic(needWaitCancelComplete);
|
||||
} catch (Exception e) {
|
||||
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
|
||||
throw new JobException(e);
|
||||
@ -153,7 +153,7 @@ public abstract class AbstractTask implements Task {
|
||||
*
|
||||
* @throws Exception Any exception that might occur during the cancellation process in the subclass.
|
||||
*/
|
||||
protected abstract void executeCancelLogic() throws Exception;
|
||||
protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception;
|
||||
|
||||
@Override
|
||||
public void before() throws JobException {
|
||||
|
||||
@ -63,8 +63,10 @@ public interface Task {
|
||||
/**
|
||||
* This method is called to cancel the execution of the task.
|
||||
* Implementations should define the necessary steps to cancel the task.
|
||||
*
|
||||
* @param needWaitCancelComplete Do we need to wait for the cancellation to be completed.
|
||||
*/
|
||||
void cancel() throws JobException;
|
||||
void cancel(boolean needWaitCancelComplete) throws JobException;
|
||||
|
||||
/**
|
||||
* get info for tvf `tasks`
|
||||
|
||||
@ -1488,7 +1488,7 @@ public class StmtExecutor {
|
||||
}
|
||||
|
||||
// Because this is called by other thread
|
||||
public void cancel(String message) {
|
||||
public void cancel(String message, boolean needWaitCancelComplete) {
|
||||
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
|
||||
if (insertOverwriteTableCommand.isPresent()) {
|
||||
// If the be scheduling has not been triggered yet, cancel the scheduling first
|
||||
@ -1504,12 +1504,16 @@ public class StmtExecutor {
|
||||
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
|
||||
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
|
||||
}
|
||||
if (insertOverwriteTableCommand.isPresent()) {
|
||||
if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) {
|
||||
// Wait for the command to run or cancel completion
|
||||
insertOverwriteTableCommand.get().waitNotRunning();
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel(String message) {
|
||||
cancel(message, true);
|
||||
}
|
||||
|
||||
private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
|
||||
if (parsedStmt instanceof LogicalPlanAdapter) {
|
||||
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;
|
||||
|
||||
Reference in New Issue
Block a user