diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 9da22b20da..44b7a27007 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1556,6 +1556,20 @@ public class Config extends ConfigBase { @ConfField public static int scheduler_job_task_max_num = 10; + /** + * The number of async tasks that can be queued. @See TaskDisruptor + * if consumer is slow, the queue will be full, and the producer will be blocked. + */ + @ConfField + public static int async_task_queen_size = 1024; + + /** + * The number of threads used to consume async tasks. @See TaskDisruptor + * if we have a lot of async tasks, we need more threads to consume them. Sure, it's depends on the cpu cores. + */ + @ConfField + public static int async_task_consumer_thread_num = 10; + // enable_workload_group should be immutable and temporarily set to mutable during the development test phase @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_workload_group = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 889355f2cd..3b59a5187e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -17,6 +17,7 @@ package org.apache.doris.scheduler.disruptor; +import org.apache.doris.common.Config; import org.apache.doris.scheduler.constants.TaskType; import org.apache.doris.scheduler.manager.TimerJobManager; import org.apache.doris.scheduler.manager.TransientTaskManager; @@ -48,20 +49,15 @@ import java.util.concurrent.TimeUnit; public class TaskDisruptor implements Closeable { private final Disruptor disruptor; - private static final int DEFAULT_RING_BUFFER_SIZE = 1024; + private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; + + private static int consumerThreadCount = Config.async_task_consumer_thread_num; /** * The default timeout for {@link #close()} in seconds. */ private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5; - /** - * The default number of consumers to create for each {@link Disruptor} instance. - */ - private static final int DEFAULT_CONSUMER_COUNT = System.getProperty("event.task.disruptor.consumer.count") - == null ? Runtime.getRuntime().availableProcessors() - : Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count")); - /** * Whether this disruptor has been closed. * if true, then we can't publish any more events. @@ -82,8 +78,8 @@ public class TaskDisruptor implements Closeable { ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); - WorkHandler[] workers = new TaskHandler[DEFAULT_CONSUMER_COUNT]; - for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) { + WorkHandler[] workers = new TaskHandler[consumerThreadCount]; + for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(timerJobManager, transientTaskManager); } disruptor.handleEventsWithWorkerPool(workers);