[Code Cleanup]Use ThreadPoolManager to manage some native thread (#3997)
Now, FE use ThreadPoolManager to manage and monitor all Thread, but there are still some threads are not managed. And FE use `Timer` class to do some scheduler task, but `Timer` class has some problem and is out of date, It should replace by ScheduledThreadPool.
This commit is contained in:
@ -30,6 +30,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
@ -113,6 +114,13 @@ public class ThreadPoolManager {
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) {
|
||||
ThreadFactory threadFactory = namedThreadFactory(poolName);
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory);
|
||||
nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
|
||||
return scheduledThreadPoolExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
|
||||
*/
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.common.publish;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -27,7 +28,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class FixedTimePublisher {
|
||||
private static FixedTimePublisher INSTANCE;
|
||||
|
||||
private ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1);
|
||||
private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher");
|
||||
private ClusterStatePublisher publisher;
|
||||
|
||||
public FixedTimePublisher(ClusterStatePublisher publisher) {
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.alter.AlterJob.JobType;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.loadv2.JobState;
|
||||
import org.apache.doris.load.loadv2.LoadManager;
|
||||
@ -42,7 +43,8 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.Timer;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public final class MetricRepo {
|
||||
@ -84,7 +86,7 @@ public final class MetricRepo {
|
||||
public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
|
||||
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
|
||||
|
||||
private static Timer metricTimer = new Timer();
|
||||
private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool");
|
||||
private static MetricCalculator metricCalculator = new MetricCalculator();
|
||||
|
||||
public static synchronized void init() {
|
||||
@ -249,7 +251,7 @@ public final class MetricRepo {
|
||||
isInit.set(true);
|
||||
|
||||
if (Config.enable_metric_calculator) {
|
||||
metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000 /* 15s */);
|
||||
metricTimer.scheduleAtFixedRate(metricCalculator, 0, 15 * 1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.mysql;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ConnectScheduler;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -27,6 +28,8 @@ import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
// MySQL protocol network service
|
||||
public class MysqlServer {
|
||||
@ -37,7 +40,8 @@ public class MysqlServer {
|
||||
private ServerSocketChannel serverChannel = null;
|
||||
private ConnectScheduler scheduler = null;
|
||||
// used to accept connect request from client
|
||||
private Thread listener;
|
||||
private ThreadPoolExecutor listener;
|
||||
private Future listenerFuture;
|
||||
|
||||
public MysqlServer(int port, ConnectScheduler scheduler) {
|
||||
this.port = port;
|
||||
@ -66,9 +70,9 @@ public class MysqlServer {
|
||||
}
|
||||
|
||||
// start accept thread
|
||||
listener = new Thread(new Listener(), "MySQL Protocol Listener");
|
||||
listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener");
|
||||
running = true;
|
||||
listener.start();
|
||||
listenerFuture = listener.submit(new Listener());
|
||||
|
||||
return true;
|
||||
}
|
||||
@ -87,8 +91,8 @@ public class MysqlServer {
|
||||
|
||||
public void join() {
|
||||
try {
|
||||
listener.join();
|
||||
} catch (InterruptedException e) {
|
||||
listenerFuture.get();
|
||||
} catch (Exception e) {
|
||||
// just return
|
||||
LOG.warn("Join MySQL server exception.", e);
|
||||
}
|
||||
@ -98,7 +102,7 @@ public class MysqlServer {
|
||||
@Override
|
||||
public void run() {
|
||||
while (running && serverChannel.isOpen()) {
|
||||
SocketChannel clientChannel = null;
|
||||
SocketChannel clientChannel;
|
||||
try {
|
||||
clientChannel = serverChannel.accept();
|
||||
if (clientChannel == null) {
|
||||
|
||||
@ -32,9 +32,10 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
// 查询请求的调度器
|
||||
@ -53,14 +54,14 @@ public class ConnectScheduler {
|
||||
// 1. If use a scheduler, the task maybe a huge number when query is messy.
|
||||
// Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
|
||||
// 2. Use a thread to poll maybe lose some accurate, but is enough to us.
|
||||
private Timer checkTimer;
|
||||
private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1,
|
||||
"Connect-Scheduler-Check-Timer");
|
||||
|
||||
public ConnectScheduler(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
numberConnection = 0;
|
||||
nextConnectionId = new AtomicInteger(0);
|
||||
checkTimer = new Timer("ConnectScheduler Check Timer", true);
|
||||
checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000);
|
||||
checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private class TimeoutChecker extends TimerTask {
|
||||
|
||||
@ -19,13 +19,13 @@ package org.apache.doris.task;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -37,7 +37,7 @@ public class MasterTaskExecutor {
|
||||
private Map<Long, Future<?>> runningTasks;
|
||||
|
||||
public MasterTaskExecutor(int threadNum) {
|
||||
executor = Executors.newScheduledThreadPool(threadNum);
|
||||
executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool");
|
||||
runningTasks = Maps.newHashMap();
|
||||
executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user