From cfb6af295f1c1f0e39838efa9e67d9470717a14c Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Wed, 22 Nov 2023 17:53:31 +0900 Subject: [PATCH] [fix](stats) Fix creating too many tasks on new env #27364 If there exists huge datasets with many database and may tables and many columns, Auto collector might be submit too many jobs which would occupy too much of FE memory. In this PR, limit job each round could submit up to 5 --- .../apache/doris/statistics/AnalysisJob.java | 33 ++++++++++--------- .../doris/statistics/AnalysisManager.java | 4 +++ .../doris/statistics/StatisticConstants.java | 2 ++ .../statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/StatisticsCollector.java | 9 +++-- .../doris/statistics/AnalysisJobTest.java | 8 ++--- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index acea77b1b7..41e4b6b317 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -45,10 +46,6 @@ public class AnalysisJob { protected List buf; - protected int totalTaskCount; - - protected int queryFinishedTaskCount; - protected StmtExecutor stmtExecutor; protected boolean killed; @@ -63,10 +60,9 @@ public class AnalysisJob { for (BaseAnalysisTask task : queryingTask) { task.job = this; } - this.queryingTask = new HashSet<>(queryingTask); - this.queryFinished = new HashSet<>(); + this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask)); + this.queryFinished = Collections.synchronizedSet(new HashSet<>()); this.buf = new ArrayList<>(); - totalTaskCount = queryingTask.size(); start = System.currentTimeMillis(); this.jobInfo = jobInfo; this.analysisManager = Env.getCurrentEnv().getAnalysisManager(); @@ -86,12 +82,14 @@ public class AnalysisJob { } protected void markOneTaskDone() { - queryFinishedTaskCount += 1; - if (queryFinishedTaskCount == totalTaskCount) { - writeBuf(); - updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " - + (System.currentTimeMillis() - start) / 1000); - deregisterJob(); + if (queryingTask.isEmpty()) { + try { + writeBuf(); + updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " + + (System.currentTimeMillis() - start) / 1000); + } finally { + deregisterJob(); + } } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { writeBuf(); } @@ -175,9 +173,12 @@ public class AnalysisJob { } public void taskFailed(BaseAnalysisTask task, String reason) { - updateTaskState(AnalysisState.FAILED, reason); - cancel(); - deregisterJob(); + try { + updateTaskState(AnalysisState.FAILED, reason); + cancel(); + } finally { + deregisterJob(); + } } public void cancel() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 24c612e101..8badad39d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -1071,4 +1071,8 @@ public class AnalysisManager implements Writable { public void removeJob(long id) { idToAnalysisJob.remove(id); } + + public boolean hasUnFinished() { + return !analysisJobIdToTaskMap.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index e6f71cd591..ee07d52d3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -94,6 +94,8 @@ public class StatisticConstants { public static final int ANALYZE_TIMEOUT_IN_SEC = 43200; + public static final int SUBMIT_JOB_LIMIT = 5; + static { SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 08487ff498..80afcb2c0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { public StatisticsAutoCollector() { super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), + TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num), new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index 638db55398..4c77d42cfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon { protected final AnalysisTaskExecutor analysisTaskExecutor; + protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { super(name, intervalMs); @@ -54,8 +55,8 @@ public abstract class StatisticsCollector extends MasterDaemon { if (Env.isCheckpointThread()) { return; } - - if (!analysisTaskExecutor.idle()) { + submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; + if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) { LOG.info("Analyze tasks those submitted in last time is not finished, skip"); return; } @@ -72,7 +73,9 @@ public abstract class StatisticsCollector extends MasterDaemon { // No statistics need to be collected or updated return; } - + if (submittedThisRound-- < 0) { + return; + } Map analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index d4dedd1712..bca05d8299 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -44,7 +44,9 @@ public class AnalysisJobTest { } @Test - public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, + @Mocked OlapAnalysisTask olapAnalysisTask, + @Mocked OlapAnalysisTask olapAnalysisTask2) { AtomicInteger writeBufInvokeTimes = new AtomicInteger(); new MockUp() { @Mock @@ -63,9 +65,9 @@ public class AnalysisJobTest { AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); job.queryingTask = new HashSet<>(); job.queryingTask.add(olapAnalysisTask); + job.queryingTask.add(olapAnalysisTask2); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 20; // not all task finished nor cached limit exceed, shouldn't write job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); @@ -97,7 +99,6 @@ public class AnalysisJobTest { job.queryingTask.add(olapAnalysisTask); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 1; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // all task finished, should write and deregister this job @@ -132,7 +133,6 @@ public class AnalysisJobTest { for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) { job.buf.add(colStatsData); } - job.totalTaskCount = 100; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // cache limit exceed, should write them