[enhancement](stats) limit bq cap size for analyze task (#27685)
This commit is contained in:
@ -248,7 +248,8 @@ public class AnalysisManager implements Writable {
|
||||
|
||||
public AnalysisManager() {
|
||||
if (!Env.isCheckpointThread()) {
|
||||
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
|
||||
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
|
||||
Integer.MAX_VALUE);
|
||||
this.statisticsCache = new StatisticsCache();
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,11 +43,15 @@ public class AnalysisTaskExecutor {
|
||||
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
|
||||
|
||||
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
|
||||
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) {
|
||||
if (!Env.isCheckpointThread()) {
|
||||
executors = ThreadPoolManager.newDaemonThreadPool(
|
||||
simultaneouslyRunningTaskNum,
|
||||
simultaneouslyRunningTaskNum, 0,
|
||||
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
|
||||
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
|
||||
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
|
||||
"Analysis Job Executor", true);
|
||||
cancelExpiredTask();
|
||||
|
||||
@ -96,7 +96,7 @@ public class StatisticConstants {
|
||||
|
||||
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
|
||||
|
||||
public static final int SUBMIT_JOB_LIMIT = 5;
|
||||
public static final int TASK_QUEUE_CAP = 10;
|
||||
|
||||
public static final int AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = 70;
|
||||
|
||||
|
||||
@ -52,7 +52,8 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
public StatisticsAutoCollector() {
|
||||
super("Automatic Analyzer",
|
||||
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
|
||||
new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num));
|
||||
new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
|
||||
StatisticConstants.TASK_QUEUE_CAP));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -35,8 +35,6 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
|
||||
protected final AnalysisTaskExecutor analysisTaskExecutor;
|
||||
|
||||
protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
|
||||
|
||||
public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
|
||||
super(name, intervalMs);
|
||||
this.analysisTaskExecutor = analysisTaskExecutor;
|
||||
@ -54,7 +52,6 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
if (Env.isCheckpointThread()) {
|
||||
return;
|
||||
}
|
||||
submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
|
||||
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
|
||||
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
|
||||
return;
|
||||
@ -72,9 +69,6 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
// No statistics need to be collected or updated
|
||||
return;
|
||||
}
|
||||
if (submittedThisRound-- < 0) {
|
||||
return;
|
||||
}
|
||||
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
|
||||
|
||||
Reference in New Issue
Block a user