From 7d5e8a13bb4df943e2c25dd8799489ebc76dbf7d Mon Sep 17 00:00:00 2001 From: zhangdong Date: Thu, 26 Dec 2024 18:52:14 +0800 Subject: [PATCH] [enhance](mtmv)When drop MTMV, no longer wait for task cancel to complete (#45995) (#46025) pick: https://github.com/apache/doris/pull/45995 --- .../main/java/org/apache/doris/job/base/AbstractJob.java | 8 ++++---- .../src/main/java/org/apache/doris/job/base/Job.java | 2 +- .../apache/doris/job/executor/DispatchTaskHandler.java | 2 +- .../org/apache/doris/job/extensions/insert/InsertJob.java | 4 ++-- .../apache/doris/job/extensions/insert/InsertTask.java | 2 +- .../org/apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++-- .../main/java/org/apache/doris/job/task/AbstractTask.java | 8 ++++---- .../src/main/java/org/apache/doris/job/task/Task.java | 4 +++- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 8 ++++++-- 9 files changed, 24 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 906b86494f..b6f62f5121 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -149,12 +149,12 @@ public abstract class AbstractJob implements Job(); @@ -184,7 +184,7 @@ public abstract class AbstractJob implements Job task.getTaskId().equals(taskId)).findFirst() - .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); + .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(true); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); canceledTaskCount.incrementAndGet(); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { @@ -292,7 +292,7 @@ public abstract class AbstractJob implements Job { * Cancels all running tasks of this job. * @throws JobException If cancelling a running task fails. */ - void cancelAllTasks() throws JobException; + void cancelAllTasks(boolean needWaitCancelComplete) throws JobException; /** * register job diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index b8f726c4a0..56222fd3e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -66,7 +66,7 @@ public class DispatchTaskHandler implements WorkHandler> impl } @Override - public void cancelAllTasks() throws JobException { + public void cancelAllTasks(boolean needWaitCancelComplete) throws JobException { try { if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { checkAuth("CANCEL LOAD"); } - super.cancelAllTasks(); + super.cancelAllTasks(needWaitCancelComplete); this.failMsg = new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"); } catch (DdlException e) { throw new JobException(e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 91f6ebe41e..94484fdcd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -223,7 +223,7 @@ public class InsertTask extends AbstractTask { } @Override - protected void executeCancelLogic() { + protected void executeCancelLogic(boolean needWaitCancelComplete) { if (isFinished.get() || isCanceled.get()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 3ab1df4734..188d6b5f45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -276,10 +276,10 @@ public class MTMVTask extends AbstractTask { } @Override - protected synchronized void executeCancelLogic() { + protected synchronized void executeCancelLogic(boolean needWaitCancelComplete) { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { - executor.cancel("mtmv task cancelled"); + executor.cancel("mtmv task cancelled", needWaitCancelComplete); } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 8a230c0bd3..b356bc58d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -129,16 +129,16 @@ public abstract class AbstractTask implements Task { /** * Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources. * This method encapsulates the core cancellation logic, calling the abstract method - * {@link #executeCancelLogic()} for task-specific actions. + * {@link #executeCancelLogic(boolean)} for task-specific actions. * * @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping * the original exception. */ @Override - public void cancel() throws JobException { + public void cancel(boolean needWaitCancelComplete) throws JobException { try { status = TaskStatus.CANCELED; - executeCancelLogic(); + executeCancelLogic(needWaitCancelComplete); } catch (Exception e) { log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); throw new JobException(e); @@ -153,7 +153,7 @@ public abstract class AbstractTask implements Task { * * @throws Exception Any exception that might occur during the cancellation process in the subclass. */ - protected abstract void executeCancelLogic() throws Exception; + protected abstract void executeCancelLogic(boolean needWaitCancelComplete) throws Exception; @Override public void before() throws JobException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index ee205c55c3..d184f64707 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -63,8 +63,10 @@ public interface Task { /** * This method is called to cancel the execution of the task. * Implementations should define the necessary steps to cancel the task. + * + * @param needWaitCancelComplete Do we need to wait for the cancellation to be completed. */ - void cancel() throws JobException; + void cancel(boolean needWaitCancelComplete) throws JobException; /** * get info for tvf `tasks` diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index e490fa2b1c..ad2c48ed17 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1488,7 +1488,7 @@ public class StmtExecutor { } // Because this is called by other thread - public void cancel(String message) { + public void cancel(String message, boolean needWaitCancelComplete) { Optional insertOverwriteTableCommand = getInsertOverwriteTableCommand(); if (insertOverwriteTableCommand.isPresent()) { // If the be scheduling has not been triggered yet, cancel the scheduling first @@ -1504,12 +1504,16 @@ public class StmtExecutor { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } - if (insertOverwriteTableCommand.isPresent()) { + if (insertOverwriteTableCommand.isPresent() && needWaitCancelComplete) { // Wait for the command to run or cancel completion insertOverwriteTableCommand.get().waitNotRunning(); } } + public void cancel(String message) { + cancel(message, true); + } + private Optional getInsertOverwriteTableCommand() { if (parsedStmt instanceof LogicalPlanAdapter) { LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt;