From 19c20247f466eecb12a18a70040e3653d6326953 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Wed, 8 May 2024 18:41:58 +0800 Subject: [PATCH] [Fix](job)Reclaim resources held by finished tasks (#34506) closeOrReleaseResources(), which is responsible for safely closing or releasing the stmtExecutor, command, and ctx objects. The method follows a null check pattern to avoid NullPointerExceptions and ensures that resources are properly cleaned up when they are no longer needed. This improves code readability and maintains a consistent approach to resource management. --- .../job/extensions/insert/InsertTask.java | 15 ++++++- .../doris/job/extensions/mtmv/MTMVTask.java | 26 ++++++++---- .../apache/doris/job/task/AbstractTask.java | 40 ++++++++++++++++++- 3 files changed, 71 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 6e6f59758b..0fe2a8364a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -166,7 +166,19 @@ public class InsertTask extends AbstractTask { } super.before(); + } + @Override + protected void closeOrReleaseResources() { + if (null != stmtExecutor) { + stmtExecutor = null; + } + if (null != command) { + command = null; + } + if (null != ctx) { + ctx = null; + } } protected TUniqueId generateQueryId(String taskIdString) { @@ -202,7 +214,7 @@ public class InsertTask extends AbstractTask { } @Override - public void cancel() throws JobException { + protected void executeCancelLogic() { if (isFinished.get() || isCanceled.get()) { return; } @@ -210,7 +222,6 @@ public class InsertTask extends AbstractTask { if (null != stmtExecutor) { stmtExecutor.cancel(); } - super.cancel(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 8924b229b2..efcdc4f564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -219,7 +219,7 @@ public class MTMVTask extends AbstractTask { } private void exec(ConnectContext ctx, Set refreshPartitionIds, - Map tableWithPartKey) + Map tableWithPartKey) throws Exception { TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -252,9 +252,8 @@ public class MTMVTask extends AbstractTask { } @Override - public synchronized void cancel() throws JobException { + protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); - super.cancel(); if (executor != null) { executor.cancel(); } @@ -380,10 +379,23 @@ public class MTMVTask extends AbstractTask { .addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation, partitionSnapshots); } - mtmv = null; - relation = null; - executor = null; - partitionSnapshots = null; + + } + + @Override + protected void closeOrReleaseResources() { + if (null != mtmv) { + mtmv = null; + } + if (null != executor) { + executor = null; + } + if (null != relation) { + relation = null; + } + if (null != partitionSnapshots) { + partitionSnapshots = null; + } } private Map getIncrementalTableMap() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 7bd2e58f87..25803085e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -96,6 +96,18 @@ public abstract class AbstractTask implements Task { return false; } + /** + * Closes or releases all allocated resources such as database connections, file streams, or any other + * external system handles that were utilized during the task execution. This method is invoked + * unconditionally, ensuring that resources are properly managed whether the task completes + * successfully, fails, or is canceled. It is crucial for preventing resource leaks and maintaining + * the overall health and efficiency of the application. + *

+ * Note: Implementations of this method should handle potential exceptions internally and log them + * appropriately to avoid interrupting the normal flow of cleanup operations. + */ + protected abstract void closeOrReleaseResources(); + @Override public void onSuccess() throws JobException { if (TaskStatus.CANCELED.equals(status)) { @@ -114,11 +126,35 @@ public abstract class AbstractTask implements Task { job.onTaskSuccess(this); } + /** + * Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources. + * This method encapsulates the core cancellation logic, calling the abstract method + * {@link #executeCancelLogic()} for task-specific actions. + * + * @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping + * the original exception. + */ @Override public void cancel() throws JobException { - status = TaskStatus.CANCELED; + try { + executeCancelLogic(); + status = TaskStatus.CANCELED; + } catch (Exception e) { + log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e); + throw new JobException(e); + } finally { + closeOrReleaseResources(); + } } + /** + * Abstract method for implementing the task-specific cancellation logic. + * Subclasses must override this method to provide their own implementation of how a task should be canceled. + * + * @throws Exception Any exception that might occur during the cancellation process in the subclass. + */ + protected abstract void executeCancelLogic() throws Exception; + @Override public void before() throws JobException { status = TaskStatus.RUNNING; @@ -134,6 +170,8 @@ public abstract class AbstractTask implements Task { this.errMsg = e.getMessage(); onFail(); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); + } finally { + closeOrReleaseResources(); } }