[enhancement](stats) Limit analyze info count (#25576)
Each analyze job info and task info would not exceed 20000 after this PR. User could adjust this by FE conf param: analyze_record_limit
This commit is contained in:
@ -2213,7 +2213,7 @@ public class Config extends ConfigBase {
|
||||
"控制统计信息的自动触发作业执行记录的持久化行数",
|
||||
"Determine the persist number of automatic triggered analyze job execution status"
|
||||
})
|
||||
public static long auto_analyze_job_record_count = 20000;
|
||||
public static long analyze_record_limit = 20000;
|
||||
|
||||
@ConfField(description = {
|
||||
"Auto Buckets中最小的buckets数目",
|
||||
|
||||
@ -86,6 +86,7 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
@ -112,10 +113,12 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
private AnalysisTaskExecutor taskExecutor;
|
||||
|
||||
// Store task information in metadata.
|
||||
private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>());
|
||||
private final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
|
||||
Collections.synchronizedNavigableMap(new TreeMap<>());
|
||||
|
||||
// Store job information in metadata
|
||||
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());
|
||||
// Store job information in metadata.
|
||||
private final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
|
||||
Collections.synchronizedNavigableMap(new TreeMap<>());
|
||||
|
||||
// Tracking system submitted job, keep in mem only
|
||||
protected final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
|
||||
@ -819,10 +822,16 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
|
||||
while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
|
||||
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
|
||||
}
|
||||
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
|
||||
}
|
||||
|
||||
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
|
||||
while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
|
||||
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
|
||||
}
|
||||
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
|
||||
}
|
||||
|
||||
@ -1075,7 +1084,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
protected SimpleQueue<AnalysisInfo> createSimpleQueue(Collection<AnalysisInfo> collection,
|
||||
AnalysisManager analysisManager) {
|
||||
return new SimpleQueue<>(Config.auto_analyze_job_record_count,
|
||||
return new SimpleQueue<>(Config.analyze_record_limit,
|
||||
a -> {
|
||||
// FE is not ready when replaying log and operations triggered by replaying
|
||||
// shouldn't be logged again.
|
||||
|
||||
Reference in New Issue
Block a user