diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index acea77b1b7..41e4b6b317 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -45,10 +46,6 @@ public class AnalysisJob { protected List buf; - protected int totalTaskCount; - - protected int queryFinishedTaskCount; - protected StmtExecutor stmtExecutor; protected boolean killed; @@ -63,10 +60,9 @@ public class AnalysisJob { for (BaseAnalysisTask task : queryingTask) { task.job = this; } - this.queryingTask = new HashSet<>(queryingTask); - this.queryFinished = new HashSet<>(); + this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask)); + this.queryFinished = Collections.synchronizedSet(new HashSet<>()); this.buf = new ArrayList<>(); - totalTaskCount = queryingTask.size(); start = System.currentTimeMillis(); this.jobInfo = jobInfo; this.analysisManager = Env.getCurrentEnv().getAnalysisManager(); @@ -86,12 +82,14 @@ public class AnalysisJob { } protected void markOneTaskDone() { - queryFinishedTaskCount += 1; - if (queryFinishedTaskCount == totalTaskCount) { - writeBuf(); - updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " - + (System.currentTimeMillis() - start) / 1000); - deregisterJob(); + if (queryingTask.isEmpty()) { + try { + writeBuf(); + updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " + + (System.currentTimeMillis() - start) / 1000); + } finally { + deregisterJob(); + } } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { writeBuf(); } @@ -175,9 +173,12 @@ public class AnalysisJob { } public void taskFailed(BaseAnalysisTask task, String reason) { - updateTaskState(AnalysisState.FAILED, reason); - cancel(); - deregisterJob(); + try { + updateTaskState(AnalysisState.FAILED, reason); + cancel(); + } finally { + deregisterJob(); + } } public void cancel() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 24c612e101..8badad39d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -1071,4 +1071,8 @@ public class AnalysisManager implements Writable { public void removeJob(long id) { idToAnalysisJob.remove(id); } + + public boolean hasUnFinished() { + return !analysisJobIdToTaskMap.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index e6f71cd591..ee07d52d3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -94,6 +94,8 @@ public class StatisticConstants { public static final int ANALYZE_TIMEOUT_IN_SEC = 43200; + public static final int SUBMIT_JOB_LIMIT = 5; + static { SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 08487ff498..80afcb2c0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { public StatisticsAutoCollector() { super("Automatic Analyzer", - TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes), + TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num), new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java index 638db55398..4c77d42cfe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCollector.java @@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon { protected final AnalysisTaskExecutor analysisTaskExecutor; + protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) { super(name, intervalMs); @@ -54,8 +55,8 @@ public abstract class StatisticsCollector extends MasterDaemon { if (Env.isCheckpointThread()) { return; } - - if (!analysisTaskExecutor.idle()) { + submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT; + if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) { LOG.info("Analyze tasks those submitted in last time is not finished, skip"); return; } @@ -72,7 +73,9 @@ public abstract class StatisticsCollector extends MasterDaemon { // No statistics need to be collected or updated return; } - + if (submittedThisRound-- < 0) { + return; + } Map analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index d4dedd1712..bca05d8299 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -44,7 +44,9 @@ public class AnalysisJobTest { } @Test - public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, + @Mocked OlapAnalysisTask olapAnalysisTask, + @Mocked OlapAnalysisTask olapAnalysisTask2) { AtomicInteger writeBufInvokeTimes = new AtomicInteger(); new MockUp() { @Mock @@ -63,9 +65,9 @@ public class AnalysisJobTest { AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); job.queryingTask = new HashSet<>(); job.queryingTask.add(olapAnalysisTask); + job.queryingTask.add(olapAnalysisTask2); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 20; // not all task finished nor cached limit exceed, shouldn't write job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); @@ -97,7 +99,6 @@ public class AnalysisJobTest { job.queryingTask.add(olapAnalysisTask); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 1; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // all task finished, should write and deregister this job @@ -132,7 +133,6 @@ public class AnalysisJobTest { for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) { job.buf.add(colStatsData); } - job.totalTaskCount = 100; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // cache limit exceed, should write them