[fix](stats) Fix thread leaks when doing checkpoint (#27334)
This commit is contained in:
@ -248,7 +248,6 @@ public class AnalysisManager implements Writable {
|
||||
if (!Env.isCheckpointThread()) {
|
||||
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
|
||||
this.statisticsCache = new StatisticsCache();
|
||||
taskExecutor.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -32,7 +32,7 @@ import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class AnalysisTaskExecutor extends Thread {
|
||||
public class AnalysisTaskExecutor {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
|
||||
|
||||
@ -50,19 +50,12 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
|
||||
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
|
||||
"Analysis Job Executor", true);
|
||||
cancelExpiredTask();
|
||||
} else {
|
||||
executors = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (Env.isCheckpointThread()) {
|
||||
return;
|
||||
}
|
||||
cancelExpiredTask();
|
||||
}
|
||||
|
||||
private void cancelExpiredTask() {
|
||||
String name = "Expired Analysis Task Killer";
|
||||
Thread t = new Thread(this::doCancelExpiredJob, name);
|
||||
|
||||
@ -40,7 +40,6 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
|
||||
super(name, intervalMs);
|
||||
this.analysisTaskExecutor = analysisTaskExecutor;
|
||||
analysisTaskExecutor.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user