[Fix](job)Reclaim resources held by finished tasks (#34506)

closeOrReleaseResources(), which is responsible for safely closing or releasing the stmtExecutor, command, and ctx objects. The method follows a null check pattern to avoid NullPointerExceptions and ensures that resources are properly cleaned up when they are no longer needed. This improves code readability and maintains a consistent approach to resource management.
This commit is contained in:
Calvin Kirs
2024-05-08 18:41:58 +08:00
committed by yiguolei
parent 7c56c17ecc
commit 19c20247f4
3 changed files with 71 additions and 10 deletions

View File

@ -166,7 +166,19 @@ public class InsertTask extends AbstractTask {
}
super.before();
}
@Override
protected void closeOrReleaseResources() {
if (null != stmtExecutor) {
stmtExecutor = null;
}
if (null != command) {
command = null;
}
if (null != ctx) {
ctx = null;
}
}
protected TUniqueId generateQueryId(String taskIdString) {
@ -202,7 +214,7 @@ public class InsertTask extends AbstractTask {
}
@Override
public void cancel() throws JobException {
protected void executeCancelLogic() {
if (isFinished.get() || isCanceled.get()) {
return;
}
@ -210,7 +222,6 @@ public class InsertTask extends AbstractTask {
if (null != stmtExecutor) {
stmtExecutor.cancel();
}
super.cancel();
}
@Override

View File

@ -219,7 +219,7 @@ public class MTMVTask extends AbstractTask {
}
private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
Map<TableIf, String> tableWithPartKey)
Map<TableIf, String> tableWithPartKey)
throws Exception {
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
@ -252,9 +252,8 @@ public class MTMVTask extends AbstractTask {
}
@Override
public synchronized void cancel() throws JobException {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
super.cancel();
if (executor != null) {
executor.cancel();
}
@ -380,10 +379,23 @@ public class MTMVTask extends AbstractTask {
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation,
partitionSnapshots);
}
mtmv = null;
relation = null;
executor = null;
partitionSnapshots = null;
}
@Override
protected void closeOrReleaseResources() {
if (null != mtmv) {
mtmv = null;
}
if (null != executor) {
executor = null;
}
if (null != relation) {
relation = null;
}
if (null != partitionSnapshots) {
partitionSnapshots = null;
}
}
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {

View File

@ -96,6 +96,18 @@ public abstract class AbstractTask implements Task {
return false;
}
/**
* Closes or releases all allocated resources such as database connections, file streams, or any other
* external system handles that were utilized during the task execution. This method is invoked
* unconditionally, ensuring that resources are properly managed whether the task completes
* successfully, fails, or is canceled. It is crucial for preventing resource leaks and maintaining
* the overall health and efficiency of the application.
* <p>
* Note: Implementations of this method should handle potential exceptions internally and log them
* appropriately to avoid interrupting the normal flow of cleanup operations.
*/
protected abstract void closeOrReleaseResources();
@Override
public void onSuccess() throws JobException {
if (TaskStatus.CANCELED.equals(status)) {
@ -114,11 +126,35 @@ public abstract class AbstractTask implements Task {
job.onTaskSuccess(this);
}
/**
* Cancels the ongoing task, updating its status to {@link TaskStatus#CANCELED} and releasing associated resources.
* This method encapsulates the core cancellation logic, calling the abstract method
* {@link #executeCancelLogic()} for task-specific actions.
*
* @throws JobException If an error occurs during the cancellation process, a new JobException is thrown wrapping
* the original exception.
*/
@Override
public void cancel() throws JobException {
status = TaskStatus.CANCELED;
try {
executeCancelLogic();
status = TaskStatus.CANCELED;
} catch (Exception e) {
log.warn("cancel task failed, job id is {}, task id is {}", jobId, taskId, e);
throw new JobException(e);
} finally {
closeOrReleaseResources();
}
}
/**
* Abstract method for implementing the task-specific cancellation logic.
* Subclasses must override this method to provide their own implementation of how a task should be canceled.
*
* @throws Exception Any exception that might occur during the cancellation process in the subclass.
*/
protected abstract void executeCancelLogic() throws Exception;
@Override
public void before() throws JobException {
status = TaskStatus.RUNNING;
@ -134,6 +170,8 @@ public abstract class AbstractTask implements Task {
this.errMsg = e.getMessage();
onFail();
log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e);
} finally {
closeOrReleaseResources();
}
}