diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 7da959e6f9..d9f9f9582f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -248,7 +248,8 @@ public class AnalysisManager implements Writable { public AnalysisManager() { if (!Env.isCheckpointThread()) { - this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num); + this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num, + Integer.MAX_VALUE); this.statisticsCache = new StatisticsCache(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index fb4530837e..3bdccaca04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -43,11 +43,15 @@ public class AnalysisTaskExecutor { Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) { + this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE); + } + + public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) { if (!Env.isCheckpointThread()) { executors = ThreadPoolManager.newDaemonThreadPool( simultaneouslyRunningTaskNum, simultaneouslyRunningTaskNum, 0, - TimeUnit.DAYS, new LinkedBlockingQueue<>(), + TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize), new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), "Analysis Job Executor", true); cancelExpiredTask(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index c9eee3a01b..3e2b9c8bc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -96,7 +96,7 @@ public class StatisticConstants { public static final int ANALYZE_TIMEOUT_IN_SEC = 43200; - public static final int SUBMIT_JOB_LIMIT = 5; + public static final int TASK_QUEUE_CAP = 10; public static final int AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = 70; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 97061018ae..8fd3be4b6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -52,7 +52,8 @@ public class StatisticsAutoCollector extends StatisticsCollector { public StatisticsAutoCollector() { super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), - new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num)); + new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num, + StatisticConstants.TASK_QUEUE_CAP)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index 63dcdab09a..9d4c311523 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -35,8 +35,6 @@ public abstract class StatisticsCollector extends MasterDaemon { protected final AnalysisTaskExecutor analysisTaskExecutor; - protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; - public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { super(name, intervalMs); this.analysisTaskExecutor = analysisTaskExecutor; @@ -54,7 +52,6 @@ public abstract class StatisticsCollector extends MasterDaemon { if (Env.isCheckpointThread()) { return; } - submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) { LOG.info("Analyze tasks those submitted in last time is not finished, skip"); return; @@ -72,9 +69,6 @@ public abstract class StatisticsCollector extends MasterDaemon { // No statistics need to be collected or updated return; } - if (submittedThisRound-- < 0) { - return; - } Map analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);