diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 0994a1d921..45564e99b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -50,7 +50,7 @@ public class TaskDisruptor { WaitStrategy waitStrategy, WorkHandler[] workHandlers, EventTranslatorVararg eventTranslator) { disruptor = new Disruptor<>(eventFactory, ringBufferSize, threadFactory, - ProducerType.SINGLE, waitStrategy); + ProducerType.MULTI, waitStrategy); disruptor.handleEventsWithWorkerPool(workHandlers); this.eventTranslator = eventTranslator; disruptor.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index a20f772eda..dc065360d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -81,7 +81,7 @@ public class TaskDisruptor implements Closeable { public void start() { ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE; disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, producerThreadFactory, - ProducerType.SINGLE, new BlockingWaitStrategy()); + ProducerType.MULTI, new BlockingWaitStrategy()); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(transientTaskManager);