[Fix](MySqlLoad) Fix meaningless thread creation every time checkpoint mysql load (#26031)
Add a unified thread name setting method
This commit is contained in:
@ -1594,6 +1594,8 @@ public class Env {
|
||||
|
||||
// start threads that should running on all FE
|
||||
private void startNonMasterDaemonThreads() {
|
||||
// start load manager thread
|
||||
loadManager.start();
|
||||
tabletStatMgr.start();
|
||||
// load and export job label cleaner thread
|
||||
labelCleaner.start();
|
||||
|
||||
@ -0,0 +1,46 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class CustomThreadFactory implements ThreadFactory {
|
||||
private final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
private final ThreadGroup group;
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final String namePrefix;
|
||||
|
||||
public CustomThreadFactory(String name) {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
|
||||
namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
||||
if (t.isDaemon()) {
|
||||
t.setDaemon(false);
|
||||
}
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||
t.setPriority(Thread.NORM_PRIORITY);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
||||
@ -104,6 +104,11 @@ public class LoadManager implements Writable {
|
||||
this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
tokenManager.start();
|
||||
mysqlLoadManager.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will be invoked by the broker load(v2) now.
|
||||
*/
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -72,7 +73,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class MysqlLoadManager {
|
||||
private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);
|
||||
|
||||
private final ThreadPoolExecutor mysqlLoadPool;
|
||||
private ThreadPoolExecutor mysqlLoadPool;
|
||||
private final TokenManager tokenManager;
|
||||
|
||||
private static class MySqlLoadContext {
|
||||
@ -137,14 +138,20 @@ public class MysqlLoadManager {
|
||||
}
|
||||
|
||||
private final Map<String, MySqlLoadContext> loadContextMap = new ConcurrentHashMap<>();
|
||||
private final EvictingQueue<MySqlLoadFailRecord> failedRecords;
|
||||
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
|
||||
private EvictingQueue<MySqlLoadFailRecord> failedRecords;
|
||||
private ScheduledExecutorService periodScheduler;
|
||||
|
||||
public MysqlLoadManager(TokenManager tokenManager) {
|
||||
this.tokenManager = tokenManager;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.periodScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mysql-load-fail-record-cleaner"));
|
||||
int poolSize = Config.mysql_load_thread_pool;
|
||||
// MySqlLoad pool can accept 4 + 4 * 5 = 24 requests by default.
|
||||
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5, "Mysql Load", true);
|
||||
this.tokenManager = tokenManager;
|
||||
this.mysqlLoadPool = ThreadPoolManager.newDaemonFixedThreadPool(poolSize, poolSize * 5,
|
||||
"Mysql Load", true);
|
||||
this.failedRecords = EvictingQueue.create(Config.mysql_load_in_memory_record);
|
||||
this.periodScheduler.scheduleAtFixedRate(this::cleanFailedRecords, 1, 24, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.load.loadv2;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
@ -41,18 +42,21 @@ import java.util.concurrent.TimeUnit;
|
||||
public class TokenManager {
|
||||
private static final Logger LOG = LogManager.getLogger(TokenManager.class);
|
||||
|
||||
private final int thriftTimeoutMs = 300 * 1000;
|
||||
private final EvictingQueue<String> tokenQueue;
|
||||
private final ScheduledExecutorService tokenGenerator;
|
||||
private int thriftTimeoutMs = 300 * 1000;
|
||||
private EvictingQueue<String> tokenQueue;
|
||||
private ScheduledExecutorService tokenGenerator;
|
||||
|
||||
public TokenManager() {
|
||||
}
|
||||
|
||||
public void start() {
|
||||
this.tokenQueue = EvictingQueue.create(Config.token_queue_size);
|
||||
// init one token to avoid async issue.
|
||||
this.tokenQueue.offer(generateNewToken());
|
||||
this.tokenGenerator = Executors.newScheduledThreadPool(1);
|
||||
this.tokenGenerator.scheduleAtFixedRate(() -> {
|
||||
tokenQueue.offer(generateNewToken());
|
||||
}, 0, Config.token_generate_period_hour, TimeUnit.HOURS);
|
||||
this.tokenGenerator = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("token-generator"));
|
||||
this.tokenGenerator.scheduleAtFixedRate(() -> tokenQueue.offer(generateNewToken()), 0,
|
||||
Config.token_generate_period_hour, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
private String generateNewToken() {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedView;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
@ -66,9 +67,11 @@ public class MTMVJobManager {
|
||||
|
||||
private final MTMVTaskManager taskManager;
|
||||
|
||||
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
|
||||
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mtmv-job-period-scheduler"));
|
||||
|
||||
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
|
||||
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
|
||||
|
||||
private final ReentrantReadWriteLock rwLock;
|
||||
|
||||
@ -86,13 +89,15 @@ public class MTMVJobManager {
|
||||
// check the scheduler before using it
|
||||
// since it may be shutdown when master change to follower without process shutdown.
|
||||
if (periodScheduler.isShutdown()) {
|
||||
periodScheduler = Executors.newScheduledThreadPool(1);
|
||||
periodScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mtmv-job-period-scheduler"));
|
||||
}
|
||||
|
||||
registerJobs();
|
||||
|
||||
if (cleanerScheduler.isShutdown()) {
|
||||
cleanerScheduler = Executors.newScheduledThreadPool(1);
|
||||
cleanerScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mtmv-job-cleaner-scheduler"));
|
||||
}
|
||||
cleanerScheduler.scheduleAtFixedRate(() -> {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.mtmv.MTMVUtils.TaskState;
|
||||
import org.apache.doris.mtmv.metadata.MTMVJob;
|
||||
import org.apache.doris.mtmv.metadata.MTMVTask;
|
||||
@ -65,13 +66,14 @@ public class MTMVTaskManager {
|
||||
// keep track of all the completed tasks
|
||||
private final Deque<MTMVTask> historyTasks = Queues.newLinkedBlockingDeque();
|
||||
|
||||
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
|
||||
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1,
|
||||
new CustomThreadFactory("mtmv-task-scheduler"));
|
||||
|
||||
private final AtomicInteger failedTaskCount = new AtomicInteger(0);
|
||||
|
||||
public void startTaskScheduler() {
|
||||
if (taskScheduler.isShutdown()) {
|
||||
taskScheduler = Executors.newScheduledThreadPool(1);
|
||||
taskScheduler = Executors.newScheduledThreadPool(1, new CustomThreadFactory("mtmv-task-scheduler"));
|
||||
}
|
||||
taskScheduler.scheduleAtFixedRate(() -> {
|
||||
checkRunningTask();
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.scheduler.disruptor;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.scheduler.constants.TaskType;
|
||||
import org.apache.doris.scheduler.manager.TimerJobManager;
|
||||
import org.apache.doris.scheduler.manager.TransientTaskManager;
|
||||
@ -28,7 +29,6 @@ import com.lmax.disruptor.TimeoutException;
|
||||
import com.lmax.disruptor.WorkHandler;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import com.lmax.disruptor.util.DaemonThreadFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.io.Closeable;
|
||||
@ -75,7 +75,7 @@ public class TaskDisruptor implements Closeable {
|
||||
};
|
||||
|
||||
public TaskDisruptor(TimerJobManager timerJobManager, TransientTaskManager transientTaskManager) {
|
||||
ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
|
||||
ThreadFactory producerThreadFactory = new CustomThreadFactory("task-disruptor-producer");
|
||||
disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
|
||||
ProducerType.SINGLE, new BlockingWaitStrategy());
|
||||
WorkHandler<TaskEvent>[] workers = new TaskHandler[consumerThreadCount];
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.scheduler.manager;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.CustomThreadFactory;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
@ -87,7 +88,8 @@ public class TimerJobManager implements Closeable, Writable {
|
||||
}
|
||||
|
||||
public void start() {
|
||||
dorisTimer = new HashedWheelTimer(1, TimeUnit.SECONDS, 660);
|
||||
dorisTimer = new HashedWheelTimer(new CustomThreadFactory("hashed-wheel-timer"),
|
||||
1, TimeUnit.SECONDS, 660);
|
||||
dorisTimer.start();
|
||||
Long currentTimeMs = System.currentTimeMillis();
|
||||
jobMap.forEach((jobId, job) -> {
|
||||
|
||||
Reference in New Issue
Block a user