From 772f181e9492676c463e0b65c3de20707e0c3966 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Thu, 23 Nov 2023 18:18:19 +0900 Subject: [PATCH] [fix](stats) Fix thread leaks when doing checkpoint (#27334) --- .../org/apache/doris/statistics/AnalysisManager.java | 1 - .../apache/doris/statistics/AnalysisTaskExecutor.java | 11 ++--------- .../apache/doris/statistics/StatisticsCollector.java | 1 - 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index c6deaf0268..73df00374e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -248,7 +248,6 @@ public class AnalysisManager implements Writable { if (!Env.isCheckpointThread()) { this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num); this.statisticsCache = new StatisticsCache(); - taskExecutor.start(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 58bae9fe66..fb4530837e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -32,7 +32,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class AnalysisTaskExecutor extends Thread { +public class AnalysisTaskExecutor { private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); @@ -50,19 +50,12 @@ public class AnalysisTaskExecutor extends Thread { TimeUnit.DAYS, new LinkedBlockingQueue<>(), new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), "Analysis Job Executor", true); + cancelExpiredTask(); } else { executors = null; } } - @Override - public void run() { - if (Env.isCheckpointThread()) { - return; - } - cancelExpiredTask(); - } - private void cancelExpiredTask() { String name = "Expired Analysis Task Killer"; Thread t = new Thread(this::doCancelExpiredJob, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index 4c77d42cfe..f71d589d4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -40,7 +40,6 @@ public abstract class StatisticsCollector extends MasterDaemon { public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { super(name, intervalMs); this.analysisTaskExecutor = analysisTaskExecutor; - analysisTaskExecutor.start(); } @Override