[fix](statistics)Add synchronize for modify analysisTaskInfoMap and analysisJobInfoMap. #31940

This commit is contained in:
Jibing-Li
2024-03-07 21:25:51 +08:00
committed by yiguolei
parent 0da010603e
commit 908dff551a

View File

@ -591,14 +591,16 @@ public class AnalysisManager implements Writable {
tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl());
}
long tblId = tbl == null ? -1 : tbl.getId();
return analysisInfos.stream()
synchronized (analysisInfos) {
return analysisInfos.stream()
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.tblId == tblId)
.filter(a -> stmt.isAuto() && a.jobType.equals(JobType.SYSTEM)
|| !stmt.isAuto() && a.jobType.equals(JobType.MANUAL))
|| !stmt.isAuto() && a.jobType.equals(JobType.MANUAL))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
}
}
public String getJobProgress(long jobId) {
@ -796,31 +798,39 @@ public class AnalysisManager implements Writable {
}
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
synchronized (analysisJobInfoMap) {
while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
}
if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
synchronized (analysisTaskInfoMap) {
while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
}
if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) {
this.analysisJobInfoMap.remove(log.id);
synchronized (analysisJobInfoMap) {
this.analysisJobInfoMap.remove(log.id);
}
}
public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) {
this.analysisTaskInfoMap.remove(log.id);
synchronized (analysisTaskInfoMap) {
this.analysisTaskInfoMap.remove(log.id);
}
}
private static class SyncTaskCollection {
@ -892,8 +902,10 @@ public class AnalysisManager implements Writable {
}
public void removeAll(List<AnalysisInfo> analysisInfos) {
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisTaskInfoMap.remove(analysisInfo.taskId);
synchronized (analysisTaskInfoMap) {
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisTaskInfoMap.remove(analysisInfo.taskId);
}
}
}