[Thread Resource Leak] Fix thread resource leak after checkpoint catalog destroyed (#4049)

This PR is mainly to fix thread resource leak, and then add some notice
to use newDaemonScheduledThreadPool api in ThreadPoolManager.
This commit is contained in:
caiconghui
2020-07-16 09:38:39 -05:00
committed by GitHub
parent 3a4a38c2fc
commit db50c19aad
18 changed files with 83 additions and 117 deletions

View File

@ -111,6 +111,7 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.Daemon;
@ -460,6 +461,11 @@ public class Catalog {
}
private Catalog() {
this(false);
}
// if isCheckpointCatalog is true, it means that we should not collect thread pool metric
private Catalog(boolean isCheckpointCatalog) {
this.idToDb = new ConcurrentHashMap<>();
this.fullNameToDb = new ConcurrentHashMap<>();
this.load = new Load();
@ -489,7 +495,7 @@ public class Catalog {
this.masterIp = "";
this.systemInfo = new SystemInfoService();
this.heartbeatMgr = new HeartbeatMgr(systemInfo);
this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog);
this.tabletInvertedIndex = new TabletInvertedIndex();
this.colocateTableIndex = new ColocateTableIndex();
this.recycleBin = new CatalogRecycleBin();
@ -503,7 +509,7 @@ public class Catalog {
this.isDefaultClusterCreated = false;
this.pullLoadJobMgr = new PullLoadJobMgr();
this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog);
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();
@ -522,7 +528,7 @@ public class Catalog {
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size);
this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
@ -555,7 +561,7 @@ public class Catalog {
// only checkpoint thread it self will goes here.
// so no need to care about the thread safe.
if (CHECKPOINT == null) {
CHECKPOINT = new Catalog();
CHECKPOINT = new Catalog(true);
}
return CHECKPOINT;
} else {
@ -1206,6 +1212,8 @@ public class Catalog {
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
}
/*
@ -1245,6 +1253,7 @@ public class Catalog {
LoadChecker.init(Config.load_checker_interval_second * 1000L);
LoadChecker.startAll();
// New load scheduler
loadTaskScheduler.start();
loadManager.prepareJobs();
loadJobScheduler.start();
loadTimeoutChecker.start();

View File

@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
/**
* ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource.
* thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer.
* it provide three functions to construct thread pool now.
* it provide four functions to construct thread pool now.
*
* 1. newDaemonCacheThreadPool
* Wrapper over newCachedThreadPool with additional maxNumThread limit.
@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit;
* Wrapper over newCachedThreadPool with additional blocking queue capacity limit.
* 3. newDaemonThreadPool
* Wrapper over ThreadPoolExecutor, user can use it to construct thread pool more flexibly.
* 4. newDaemonScheduledThreadPool
* Wrapper over ScheduledThreadPoolExecutor, but without delay task num limit and thread num limit now(NOTICE).
*
* All thread pool constructed by ThreadPoolManager will be added to the nameToThreadPoolMap,
* so the thread pool name in fe must be unique.
@ -66,6 +68,7 @@ public class ThreadPoolManager {
for (Map.Entry<String, ThreadPoolExecutor> entry : nameToThreadPoolMap.entrySet()) {
registerThreadPoolMetric(entry.getKey(), entry.getValue());
}
nameToThreadPoolMap.clear();
}
public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) {
@ -92,13 +95,14 @@ public class ThreadPoolManager {
}
}
public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(), new LogDiscardPolicy(poolName), poolName);
public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new SynchronousQueue(),
new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName) {
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME ,TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new BlockedPolicy(poolName, 60), poolName);
new BlockedPolicy(poolName, 60), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonThreadPool(int corePoolSize,
@ -107,17 +111,25 @@ public class ThreadPoolManager {
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
String poolName) {
String poolName,
boolean needRegisterMetric) {
ThreadFactory threadFactory = namedThreadFactory(poolName);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
nameToThreadPoolMap.put(poolName, threadPool);
if (needRegisterMetric) {
nameToThreadPoolMap.put(poolName, threadPool);
}
return threadPool;
}
public static ScheduledThreadPoolExecutor newScheduledThreadPool(int maxNumThread, String poolName) {
// Now, we have no delay task num limit and thread num limit in ScheduledThreadPoolExecutor,
// so it may cause oom when there are too many delay tasks or threads in ScheduledThreadPoolExecutor
// Please use this api only for scheduling short task at fix rate.
public static ScheduledThreadPoolExecutor newDaemonScheduledThreadPool(int corePoolSize, String poolName, boolean needRegisterMetric) {
ThreadFactory threadFactory = namedThreadFactory(poolName);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(maxNumThread, threadFactory);
nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
if (needRegisterMetric) {
nameToThreadPoolMap.put(poolName, scheduledThreadPoolExecutor);
}
return scheduledThreadPoolExecutor;
}

View File

@ -100,7 +100,7 @@ public class ThriftServer {
TThreadedSelectorServer.Args args =
new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool");
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
args.executorService(threadPoolExecutor);
server = new TThreadedSelectorServer(args);
}
@ -114,7 +114,7 @@ public class ThriftServer {
TThreadPoolServer.Args serverArgs =
new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool");
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
serverArgs.executorService(threadPoolExecutor);
server = new TThreadPoolServer(serverArgs);
}

View File

@ -41,7 +41,7 @@ public class ClusterStatePublisher {
private static final Logger LOG = LogManager.getLogger(ClusterStatePublisher.class);
private static ClusterStatePublisher INSTANCE;
private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher");
private ExecutorService executor = ThreadPoolManager.newDaemonFixedThreadPool(5, 256, "cluster-state-publisher", true);
private SystemInfoService clusterInfoService;

View File

@ -1,72 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.publish;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// Fixed time scheduled publisher.
// You can register your routine publish here.
public class FixedTimePublisher {
private static FixedTimePublisher INSTANCE;
private ScheduledThreadPoolExecutor scheduler = ThreadPoolManager.newScheduledThreadPool(1, "Fixed-Time-Publisher");
private ClusterStatePublisher publisher;
public FixedTimePublisher(ClusterStatePublisher publisher) {
this.publisher = publisher;
}
public static FixedTimePublisher getInstance() {
if (INSTANCE == null) {
INSTANCE = new FixedTimePublisher(ClusterStatePublisher.getInstance());
}
return INSTANCE;
}
public void register(Callback callback, long intervalMs) {
scheduler.scheduleAtFixedRate(new Worker(callback), 0, intervalMs, TimeUnit.MILLISECONDS);
}
private class Worker implements Runnable {
private Callback callback;
public Worker(Callback callback) {
this.callback = callback;
}
@Override
public void run() {
ClusterStateUpdate.Builder builder = ClusterStateUpdate.builder();
builder.addUpdate(callback.getTopicUpdate());
ClusterStateUpdate state = builder.build();
Listener listener = Listeners.nullToNoOpListener(callback.getListener());
publisher.publish(state, listener, Config.meta_publish_timeout_ms);
}
}
public static interface Callback {
public TopicUpdate getTopicUpdate();
public Listener getListener();
}
}

View File

@ -53,10 +53,10 @@ public final class ExportChecker extends MasterDaemon {
checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs));
int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor(poolSize);
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export_pending_job", poolSize, true);
executors.put(JobState.PENDING, pendingTaskExecutor);
MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor(poolSize);
MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export_exporting_job", poolSize, true);
executors.put(JobState.EXPORTING, exportingTaskExecutor);
}
@ -64,6 +64,9 @@ public final class ExportChecker extends MasterDaemon {
for (ExportChecker exportChecker : checkers.values()) {
exportChecker.start();
}
for (MasterTaskExecutor masterTaskExecutor : executors.values()) {
masterTaskExecutor.start();
}
}
@Override

View File

@ -92,14 +92,14 @@ public class LoadChecker extends MasterDaemon {
Map<TPriority, MasterTaskExecutor> pendingPriorityMap = Maps.newHashMap();
pendingPriorityMap.put(TPriority.NORMAL,
new MasterTaskExecutor(Config.load_pending_thread_num_normal_priority));
new MasterTaskExecutor("load_pending_thread_num_normal_priority", Config.load_pending_thread_num_normal_priority, true));
pendingPriorityMap.put(TPriority.HIGH,
new MasterTaskExecutor(Config.load_pending_thread_num_high_priority));
new MasterTaskExecutor("load_pending_thread_num_high_priority", Config.load_pending_thread_num_high_priority, true));
executors.put(JobState.PENDING, pendingPriorityMap);
Map<TPriority, MasterTaskExecutor> etlPriorityMap = Maps.newHashMap();
etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor(Config.load_etl_thread_num_normal_priority));
etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor(Config.load_etl_thread_num_high_priority));
etlPriorityMap.put(TPriority.NORMAL, new MasterTaskExecutor("load_etl_thread_num_normal_priority", Config.load_etl_thread_num_normal_priority, true));
etlPriorityMap.put(TPriority.HIGH, new MasterTaskExecutor("load_etl_thread_num_high_priority", Config.load_etl_thread_num_high_priority, true));
executors.put(JobState.ETL, etlPriorityMap);
}
@ -110,6 +110,11 @@ public class LoadChecker extends MasterDaemon {
for (LoadChecker loadChecker : checkers.values()) {
loadChecker.start();
}
for (Map<TPriority, MasterTaskExecutor> map : executors.values()) {
for (MasterTaskExecutor masterTaskExecutor : map.values()) {
masterTaskExecutor.start();
}
}
}
@Override

View File

@ -86,7 +86,7 @@ public final class MetricRepo {
public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newScheduledThreadPool(1, "Metric-Timer-Pool");
private static ScheduledThreadPoolExecutor metricTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1, "Metric-Timer-Pool", true);
private static MetricCalculator metricCalculator = new MetricCalculator();
public static synchronized void init() {

View File

@ -70,7 +70,7 @@ public class MysqlServer {
}
// start accept thread
listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener");
listener = ThreadPoolManager.newDaemonCacheThreadPool(1, "MySQL-Protocol-Listener", true);
running = true;
listenerFuture = listener.submit(new Listener());

View File

@ -46,7 +46,7 @@ public class NMysqlServer extends MysqlServer {
private AcceptingChannel<StreamConnection> server;
// default task service.
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool");
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_mysql_service_task_threads_num, "doris-mysql-nio-pool", true);
public NMysqlServer(int port, ConnectScheduler connectScheduler) {
this.port = port;

View File

@ -48,14 +48,14 @@ public class ConnectScheduler {
private AtomicInteger nextConnectionId;
private Map<Long, ConnectContext> connectionMap = Maps.newHashMap();
private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool");
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true);
// Use a thread to check whether connection is timeout. Because
// 1. If use a scheduler, the task maybe a huge number when query is messy.
// Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
// 2. Use a thread to poll maybe lose some accurate, but is enough to us.
private ScheduledExecutorService checkTimer = ThreadPoolManager.newScheduledThreadPool(1,
"Connect-Scheduler-Check-Timer");
private ScheduledExecutorService checkTimer = ThreadPoolManager.newDaemonScheduledThreadPool(1,
"Connect-Scheduler-Check-Timer", true);
public ConnectScheduler(int maxConnections) {
this.maxConnections = maxConnections;

View File

@ -70,11 +70,11 @@ public class HeartbeatMgr extends MasterDaemon {
private static volatile AtomicReference<TMasterInfo> masterInfo = new AtomicReference<>();
public HeartbeatMgr(SystemInfoService nodeMgr) {
public HeartbeatMgr(SystemInfoService nodeMgr, boolean needRegisterMetric) {
super("heartbeat mgr", FeConstants.heartbeat_interval_second * 1000);
this.nodeMgr = nodeMgr;
this.executor = ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool");
Config.heartbeat_mgr_blocking_queue_size, "heartbeat-mgr-pool", needRegisterMetric);
this.heartbeatFlags = new HeartbeatFlags();
}

View File

@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService;
public class AgentTaskExecutor {
private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool");
private static final ExecutorService EXECUTOR = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_agent_task_threads_num, "agent-task-pool", true);
public AgentTaskExecutor() {

View File

@ -27,21 +27,28 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MasterTaskExecutor {
private static final Logger LOG = LogManager.getLogger(MasterTaskExecutor.class);
private ScheduledExecutorService executor;
private ThreadPoolExecutor executor;
private Map<Long, Future<?>> runningTasks;
public ScheduledThreadPoolExecutor scheduledThreadPool;
public MasterTaskExecutor(int threadNum) {
executor = ThreadPoolManager.newScheduledThreadPool(threadNum, "Master-Task-Executor-Pool");
public MasterTaskExecutor(String name, int threadNum, boolean needRegisterMetric) {
executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, threadNum * 2, name + "_pool", needRegisterMetric);
runningTasks = Maps.newHashMap();
executor.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS);
scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
}
public void start() {
scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS);
}
/**
* submit task to task executor
* @param task
@ -61,6 +68,7 @@ public class MasterTaskExecutor {
}
public void close() {
scheduledThreadPool.shutdown();
executor.shutdown();
runningTasks.clear();
}
@ -70,7 +78,7 @@ public class MasterTaskExecutor {
return runningTasks.size();
}
}
private class TaskChecker implements Runnable {
@Override
public void run() {

View File

@ -46,8 +46,8 @@ public class PullLoadJobMgr {
private int concurrency = 10;
public PullLoadJobMgr() {
executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr");
public PullLoadJobMgr(boolean needRegisterMetric) {
executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric);
}
/**

View File

@ -29,9 +29,9 @@ public class ThreadPoolManagerTest {
@Test
public void testNormal() throws InterruptedException {
ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool");
ThreadPoolExecutor testCachedPool = ThreadPoolManager.newDaemonCacheThreadPool(2, "test_cache_pool", true);
ThreadPoolExecutor testFixedThreaddPool = ThreadPoolManager.newDaemonFixedThreadPool(2, 2,
"test_fixed_thread_pool");
"test_fixed_thread_pool", true);
ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool);
ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool);

View File

@ -165,7 +165,7 @@ public class RoutineLoadSchedulerTest {
RoutineLoadTaskScheduler routineLoadTaskScheduler = new RoutineLoadTaskScheduler();
routineLoadTaskScheduler.setInterval(5000);
ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler");
ExecutorService executorService = ThreadPoolManager.newDaemonFixedThreadPool(2, 2, "routine-load-task-scheduler", false);
executorService.submit(routineLoadScheduler);
executorService.submit(routineLoadTaskScheduler);

View File

@ -33,7 +33,8 @@ public class MasterTaskExecutorTest {
@Before
public void setUp() {
executor = new MasterTaskExecutor(THREAD_NUM);
executor = new MasterTaskExecutor("master_task_executor_test", THREAD_NUM, false);
executor.start();
}
@After