[fix](stats) Fix creating too many tasks on new env #27364

If there exists huge datasets with many database and may tables and many columns, Auto collector might be submit too many jobs which would occupy too much of FE memory.

In this PR, limit job each round could submit up to 5
This commit is contained in:
AKIRA
2023-11-22 17:53:31 +09:00
committed by GitHub
parent 6a48abeb80
commit cfb6af295f
6 changed files with 34 additions and 24 deletions

View File

@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -45,10 +46,6 @@ public class AnalysisJob {
protected List<ColStatsData> buf;
protected int totalTaskCount;
protected int queryFinishedTaskCount;
protected StmtExecutor stmtExecutor;
protected boolean killed;
@ -63,10 +60,9 @@ public class AnalysisJob {
for (BaseAnalysisTask task : queryingTask) {
task.job = this;
}
this.queryingTask = new HashSet<>(queryingTask);
this.queryFinished = new HashSet<>();
this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask));
this.queryFinished = Collections.synchronizedSet(new HashSet<>());
this.buf = new ArrayList<>();
totalTaskCount = queryingTask.size();
start = System.currentTimeMillis();
this.jobInfo = jobInfo;
this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
@ -86,12 +82,14 @@ public class AnalysisJob {
}
protected void markOneTaskDone() {
queryFinishedTaskCount += 1;
if (queryFinishedTaskCount == totalTaskCount) {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
deregisterJob();
if (queryingTask.isEmpty()) {
try {
writeBuf();
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
+ (System.currentTimeMillis() - start) / 1000);
} finally {
deregisterJob();
}
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
writeBuf();
}
@ -175,9 +173,12 @@ public class AnalysisJob {
}
public void taskFailed(BaseAnalysisTask task, String reason) {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
deregisterJob();
try {
updateTaskState(AnalysisState.FAILED, reason);
cancel();
} finally {
deregisterJob();
}
}
public void cancel() {

View File

@ -1071,4 +1071,8 @@ public class AnalysisManager implements Writable {
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}
public boolean hasUnFinished() {
return !analysisJobIdToTaskMap.isEmpty();
}
}

View File

@ -94,6 +94,8 @@ public class StatisticConstants {
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
public static final int SUBMIT_JOB_LIMIT = 5;
static {
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);

View File

@ -50,7 +50,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
TimeUnit.MINUTES.toMillis(Config.full_auto_analyze_simultaneously_running_task_num),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}

View File

@ -35,6 +35,7 @@ public abstract class StatisticsCollector extends MasterDaemon {
protected final AnalysisTaskExecutor analysisTaskExecutor;
protected int submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
public StatisticsCollector(String name, long intervalMs, AnalysisTaskExecutor analysisTaskExecutor) {
super(name, intervalMs);
@ -54,8 +55,8 @@ public abstract class StatisticsCollector extends MasterDaemon {
if (Env.isCheckpointThread()) {
return;
}
if (!analysisTaskExecutor.idle()) {
submittedThisRound = StatisticConstants.SUBMIT_JOB_LIMIT;
if (Env.getCurrentEnv().getAnalysisManager().hasUnFinished()) {
LOG.info("Analyze tasks those submitted in last time is not finished, skip");
return;
}
@ -72,7 +73,9 @@ public abstract class StatisticsCollector extends MasterDaemon {
// No statistics need to be collected or updated
return;
}
if (submittedThisRound-- < 0) {
return;
}
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);