diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 26b9bbd2e1..caa7d50d91 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1594,6 +1594,8 @@ public class Env { // start threads that should running on all FE private void startNonMasterDaemonThreads() { + // start load manager thread + loadManager.start(); tabletStatMgr.start(); // load and export job label cleaner thread labelCleaner.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java new file mode 100644 index 0000000000..153131ec25 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/CustomThreadFactory.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + public CustomThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-"; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 73d4d1a57a..3f05e2c5a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -104,6 +104,11 @@ public class LoadManager implements Writable { this.mysqlLoadManager = new MysqlLoadManager(tokenManager); } + public void start() { + tokenManager.start(); + mysqlLoadManager.start(); + } + /** * This method will be invoked by the broker load(v2) now. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index c25699cc63..ee71b28385 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; @@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit; public class MysqlLoadManager { private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); - private final ThreadPoolExecutor mysqlLoadPool; + private ThreadPoolExecutor mysqlLoadPool; private final TokenManager tokenManager; private static class MySqlLoadContext { @@ -137,14 +138,20 @@ public class MysqlLoadManager { } private final Map loadContextMap = new ConcurrentHashMap<>(); - private final EvictingQueue failedRecords; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private EvictingQueue failedRecords; + private ScheduledExecutorService periodScheduler; public MysqlLoadManager(TokenManager tokenManager) { + this.tokenManager = tokenManager; + } + + public void start() { + this.periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mysql-load-fail-record-cleaner")); int poolSize = Config.mysql_load_thread_pool; // MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default. - this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true); - this.tokenManager = tokenManager; + this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, + "Mysql Load", true); this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record); this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index f4cf451821..80f6c3f9b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.thrift.FrontendService; @@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit; public class TokenManager { private static final Logger LOG = LogManager.getLogger(TokenManager.class); - private final int thriftTimeoutMs = 300 * 1000; - private final EvictingQueue tokenQueue; - private final ScheduledExecutorService tokenGenerator; + private int thriftTimeoutMs = 300 * 1000; + private EvictingQueue tokenQueue; + private ScheduledExecutorService tokenGenerator; public TokenManager() { + } + + public void start() { this.tokenQueue = EvictingQueue.create(Config.token_queue_size); // init one token to avoid async issue. this.tokenQueue.offer(generateNewToken()); - this.tokenGenerator = Executors.newScheduledThreadPool(1); - this.tokenGenerator.scheduleAtFixedRate(() -> { - tokenQueue.offer(generateNewToken()); - }, 0, Config.token_generate_period_hour, TimeUnit.HOURS); + this.tokenGenerator = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("token-generator")); + this.tokenGenerator.scheduleAtFixedRate(() -> tokenQueue.offer(generateNewToken()), 0, + Config.token_generate_period_hour, TimeUnit.HOURS); } private String generateNewToken() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index 7295f40b60..b58f26b863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MaterializedView; import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; @@ -66,9 +67,11 @@ public class MTMVJobManager { private final MTMVTaskManager taskManager; - private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); - private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); private final ReentrantReadWriteLock rwLock; @@ -86,13 +89,15 @@ public class MTMVJobManager { // check the scheduler before using it // since it may be shutdown when master change to follower without process shutdown. if (periodScheduler.isShutdown()) { - periodScheduler = Executors.newScheduledThreadPool(1); + periodScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-period-scheduler")); } registerJobs(); if (cleanerScheduler.isShutdown()) { - cleanerScheduler = Executors.newScheduledThreadPool(1); + cleanerScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-job-cleaner-scheduler")); } cleanerScheduler.scheduleAtFixedRate(() -> { if (!Env.getCurrentEnv().isMaster()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java index d6e370480b..138ede9e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java @@ -20,6 +20,7 @@ package org.apache.doris.mtmv; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.mtmv.MTMVUtils.TaskState; import org.apache.doris.mtmv.metadata.MTMVJob; import org.apache.doris.mtmv.metadata.MTMVTask; @@ -65,13 +66,14 @@ public class MTMVTaskManager { // keep track of all the completed tasks private final Deque historyTasks = Queues.newLinkedBlockingDeque(); - private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1); + private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1, + new CustomThreadFactory("mtmv-task-scheduler")); private final AtomicInteger failedTaskCount = new AtomicInteger(0); public void startTaskScheduler() { if (taskScheduler.isShutdown()) { - taskScheduler = Executors.newScheduledThreadPool(1); + taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler")); } taskScheduler.scheduleAtFixedRate(() -> { checkRunningTask(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index dcf346e3eb..a8d98831f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -18,6 +18,7 @@ package org.apache.doris.scheduler.disruptor; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -28,7 +29,6 @@ import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; import lombok.extern.slf4j.Slf4j; import java.io.Closeable; @@ -75,7 +75,7 @@ public class TaskDisruptor implements Closeable { }; public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) { - ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; + ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer"); disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java index 9c53d96443..c7a728cf04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java @@ -19,6 +19,7 @@ package org.apache.doris.scheduler.manager; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; +import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.common.DdlException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.io.Writable; @@ -87,7 +88,8 @@ public class TimerJobManager implements Closeable, Writable { } public void start() { - dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660); + dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"), + 1, TimeUnit.SECONDS, 660); dorisTimer.start(); Long currentTimeMs = System.currentTimeMillis(); jobMap.forEach((jobId, job) -> {