[fix](statistics) Fix statistics related threads continuously spawn as doing checkpoint #16088
This commit is contained in:
@ -1850,7 +1850,7 @@ public class Config extends ConfigBase {
|
||||
* Used to determined how many statistics collection SQL could run simultaneously.
|
||||
*/
|
||||
@ConfField
|
||||
public static int statistics_simultaneously_running_job_num = 10;
|
||||
public static int statistics_simultaneously_running_task_num = 10;
|
||||
|
||||
/**
|
||||
* Internal table replica num, once set, user should promise the avaible BE is greater than this value,
|
||||
|
||||
@ -259,7 +259,6 @@ import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@ -639,7 +638,9 @@ public class Env {
|
||||
this.mtmvJobManager = new MTMVJobManager();
|
||||
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
|
||||
this.fqdnManager = new FQDNManager(systemInfo);
|
||||
this.analysisManager = new AnalysisManager();
|
||||
if (!isCheckpointCatalog) {
|
||||
this.analysisManager = new AnalysisManager();
|
||||
}
|
||||
}
|
||||
|
||||
public static void destroyCheckpoint() {
|
||||
|
||||
@ -38,8 +38,8 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
|
||||
|
||||
private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool(
|
||||
Config.statistics_simultaneously_running_job_num,
|
||||
Config.statistics_simultaneously_running_job_num, 0,
|
||||
Config.statistics_simultaneously_running_task_num,
|
||||
Config.statistics_simultaneously_running_task_num, 0,
|
||||
TimeUnit.DAYS, new LinkedBlockingQueue<>(),
|
||||
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
|
||||
"Analysis Job Executor", true);
|
||||
@ -47,9 +47,9 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
private final AnalysisTaskScheduler taskScheduler;
|
||||
|
||||
private final BlockingCounter blockingCounter =
|
||||
new BlockingCounter(Config.statistics_simultaneously_running_job_num);
|
||||
new BlockingCounter(Config.statistics_simultaneously_running_task_num);
|
||||
|
||||
private final BlockingQueue<AnalysisTaskWrapper> jobQueue =
|
||||
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
|
||||
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
|
||||
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
|
||||
|
||||
@ -60,11 +60,11 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
fetchAndExecute();
|
||||
cancelExpiredJob();
|
||||
cancelExpiredTask();
|
||||
}
|
||||
|
||||
private void cancelExpiredJob() {
|
||||
String name = "Expired Analysis Job Killer";
|
||||
private void cancelExpiredTask() {
|
||||
String name = "Expired Analysis Task Killer";
|
||||
Thread t = new Thread(this::doCancelExpiredJob, name);
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
@ -73,7 +73,7 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
private void doCancelExpiredJob() {
|
||||
for (;;) {
|
||||
try {
|
||||
AnalysisTaskWrapper taskWrapper = jobQueue.take();
|
||||
AnalysisTaskWrapper taskWrapper = taskQueue.take();
|
||||
try {
|
||||
long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS;
|
||||
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
|
||||
@ -95,7 +95,7 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
LOG.warn(throwable);
|
||||
}
|
||||
}
|
||||
}, "Analysis Job Submitter");
|
||||
}, "Analysis Task Submitter");
|
||||
t.setDaemon(true);
|
||||
t.start();
|
||||
}
|
||||
@ -119,6 +119,6 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
}
|
||||
|
||||
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
|
||||
jobQueue.put(wrapper);
|
||||
taskQueue.put(wrapper);
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,7 +74,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
|
||||
};
|
||||
|
||||
AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
|
||||
BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue");
|
||||
BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "taskQueue");
|
||||
AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
|
||||
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
|
||||
b.put(analysisTaskWrapper);
|
||||
|
||||
Reference in New Issue
Block a user