From 908dff551aba955073e9692f6ea6f0f00718cd99 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Thu, 7 Mar 2024 21:25:51 +0800 Subject: [PATCH] [fix](statistics)Add synchronize for modify analysisTaskInfoMap and analysisJobInfoMap. #31940 --- .../doris/statistics/AnalysisManager.java | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b265b88f70..6bdf6fdb77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -591,14 +591,16 @@ public class AnalysisManager implements Writable { tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl()); } long tblId = tbl == null ? -1 : tbl.getId(); - return analysisInfos.stream() + synchronized (analysisInfos) { + return analysisInfos.stream() .filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId()) .filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state))) .filter(a -> tblName == null || a.tblId == tblId) .filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM) - || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL)) + || !stmt.isAuto() && a.jobType.equals(JobType.MANUAL)) .sorted(Comparator.comparingLong(a -> a.jobId)) .collect(Collectors.toList()); + } } public String getJobProgress(long jobId) { @@ -796,31 +798,39 @@ public class AnalysisManager implements Writable { } public void replayCreateAnalysisJob(AnalysisInfo jobInfo) { - while (analysisJobInfoMap.size() >= Config.analyze_record_limit) { - analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey()); + synchronized (analysisJobInfoMap) { + while (analysisJobInfoMap.size() >= Config.analyze_record_limit) { + analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey()); + } + if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { + jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + } + this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo); } - if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { - jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); - } - this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo); } public void replayCreateAnalysisTask(AnalysisInfo taskInfo) { - while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) { - analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey()); + synchronized (analysisTaskInfoMap) { + while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) { + analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey()); + } + if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { + taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); + } + this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo); } - if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) { - taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND); - } - this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo); } public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) { - this.analysisJobInfoMap.remove(log.id); + synchronized (analysisJobInfoMap) { + this.analysisJobInfoMap.remove(log.id); + } } public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) { - this.analysisTaskInfoMap.remove(log.id); + synchronized (analysisTaskInfoMap) { + this.analysisTaskInfoMap.remove(log.id); + } } private static class SyncTaskCollection { @@ -892,8 +902,10 @@ public class AnalysisManager implements Writable { } public void removeAll(List analysisInfos) { - for (AnalysisInfo analysisInfo : analysisInfos) { - analysisTaskInfoMap.remove(analysisInfo.taskId); + synchronized (analysisTaskInfoMap) { + for (AnalysisInfo analysisInfo : analysisInfos) { + analysisTaskInfoMap.remove(analysisInfo.taskId); + } } }