From a7fe2ae9010fe882dcad3e456c6717d2b666b440 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 12 Sep 2024 22:42:45 +0800 Subject: [PATCH] [branch-2.1][Fix](Job)Replace BlockingWaitStrategy with LiteTimeoutBlockingWaitStrategy to avoid deadlock issues. (#40625) (#40707) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … FYI https://issues.apache.org/jira/browse/LOG4J2-1221 - BlockingWaitStrategy is a wait strategy used in the Disruptor framework that blocks the thread when the ring buffer is full or not yet available for publishing. When threads are blocked, they are waiting for space in the ring buffer to become available, which can lead to potential deadlocks if not managed properly. Timeout Handling: - LiteTimeoutBlockingWaitStrategy provides a timeout for waiting threads. If the buffer is not ready within the timeout period, the thread is released, preventing it from being blocked indefinitely. Reduced Risk of Deadlocks: - By avoiding indefinite blocking, this strategy reduces the risk of deadlocks caused by threads waiting on each other. The timeout allows the system to handle scenarios where resources are temporarily (cherry picked from commit 087048f2b2f06eae15e80e7ffa209610c9e3f173) ## Proposed changes Issue Number: close #xxx --- .../doris/job/disruptor/TaskDisruptor.java | 15 ++++++++++--- .../job/executor/DispatchTaskHandler.java | 5 ++++- .../job/executor/TimerJobSchedulerTask.java | 5 ++++- .../manager/TaskDisruptorGroupManager.java | 22 +++++++++---------- .../doris/job/scheduler/JobScheduler.java | 13 ++++++----- .../scheduler/disruptor/TaskDisruptor.java | 10 ++++----- 6 files changed, 44 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 45564e99b1..6ca2924c59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -24,6 +24,8 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.concurrent.ThreadFactory; @@ -33,6 +35,7 @@ import java.util.concurrent.ThreadFactory; * @param the type of the event handled by the Disruptor */ public class TaskDisruptor { + private static final Logger LOG = LogManager.getLogger(TaskDisruptor.class); private final Disruptor disruptor; private final EventTranslatorVararg eventTranslator; @@ -68,9 +71,15 @@ public class TaskDisruptor { * * @param args the arguments for the event */ - public void publishEvent(Object... args) { - RingBuffer ringBuffer = disruptor.getRingBuffer(); - ringBuffer.publishEvent(eventTranslator, args); + public boolean publishEvent(Object... args) { + try { + RingBuffer ringBuffer = disruptor.getRingBuffer(); + return ringBuffer.tryPublishEvent(eventTranslator, args); + } catch (Exception e) { + LOG.warn("Failed to publish event", e); + // Handle the exception, e.g., retry or alert + } + return false; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java index e5933d133c..d93393aa0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java @@ -66,7 +66,10 @@ public class DispatchTaskHandler implements WorkHandler implements TimerTask { log.info("job status is not running, job id is {}, skip dispatch", this.job.getJobId()); return; } - dispatchDisruptor.publishEvent(this.job); + if (!dispatchDisruptor.publishEvent(this.job)) { + log.warn("dispatch timer job failed, job id is {}, job name is {}", + this.job.getJobId(), this.job.getJobName()); + } } catch (Exception e) { log.warn("dispatch timer job error, task id is {}", this.job.getJobId(), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java index 4e31e46701..b1ccb97644 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java @@ -31,15 +31,16 @@ import org.apache.doris.job.extensions.insert.InsertTask; import org.apache.doris.job.extensions.mtmv.MTMVTask; import org.apache.doris.job.task.AbstractTask; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslatorVararg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.WorkHandler; import lombok.Getter; import java.util.EnumMap; import java.util.Map; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; public class TaskDisruptorGroupManager { @@ -86,7 +87,8 @@ public class TaskDisruptorGroupManager { (event, sequence, args) -> event.setJob((AbstractJob) args[0]); this.dispatchDisruptor = new TaskDisruptor<>(dispatchEventFactory, DISPATCH_TIMER_JOB_QUEUE_SIZE, dispatchThreadFactory, - new BlockingWaitStrategy(), dispatchTaskExecutorHandlers, eventTranslator); + new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + dispatchTaskExecutorHandlers, eventTranslator); } private void registerInsertDisruptor() { @@ -102,7 +104,8 @@ public class TaskDisruptorGroupManager { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor insertDisruptor = new TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE, - insertTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + insertTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.INSERT, insertDisruptor); } @@ -119,17 +122,14 @@ public class TaskDisruptorGroupManager { event.setJobConfig((JobExecutionConfiguration) args[1]); }; TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory, DISPATCH_MTMV_TASK_QUEUE_SIZE, - mtmvTaskThreadFactory, new BlockingWaitStrategy(), insertTaskExecutorHandlers, eventTranslator); + mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS), + insertTaskExecutorHandlers, eventTranslator); disruptorMap.put(JobType.MV, mtmvDisruptor); } - public void dispatchTimerJob(AbstractJob job) { - dispatchDisruptor.publishEvent(job); - } - - public void dispatchInstantTask(AbstractTask task, JobType jobType, - JobExecutionConfiguration jobExecutionConfiguration) { - disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); + public boolean dispatchInstantTask(AbstractTask task, JobType jobType, + JobExecutionConfiguration jobExecutionConfiguration) { + return disruptorMap.get(jobType).publishEvent(task, jobExecutionConfiguration); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 33d12c30a4..862b85597c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -155,7 +155,7 @@ public class JobScheduler, C> implements Closeable { } - public void schedulerInstantJob(T job, TaskType taskType, C context) { + public void schedulerInstantJob(T job, TaskType taskType, C context) throws JobException { List tasks = job.commonCreateTasks(taskType, context); if (CollectionUtils.isEmpty(tasks)) { log.info("job create task is empty, skip scheduler, job id is {}, job name is {}", job.getJobId(), @@ -165,12 +165,15 @@ public class JobScheduler, C> implements Closeable { } return; } - tasks.forEach(task -> { - taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), - job.getJobConfig()); + for (AbstractTask task : tasks) { + if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), + job.getJobConfig())) { + throw new JobException("dispatch instant task failed, job id is " + + job.getJobId() + ", task id is " + task.getTaskId()); + } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), job.getJobName(), task.getTaskId()); - }); + } } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java index 57df84a0e8..345b31d6bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java @@ -21,8 +21,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.CustomThreadFactory; import org.apache.doris.scheduler.constants.TaskType; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventTranslatorThreeArg; +import com.lmax.disruptor.LiteTimeoutBlockingWaitStrategy; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.dsl.Disruptor; @@ -44,7 +44,7 @@ import java.util.concurrent.TimeUnit; @Log4j2 public class TaskDisruptor implements Closeable { - private Disruptor disruptor; + private Disruptor disruptor; private static final int DEFAULT_RING_BUFFER_SIZE = Config.async_task_queen_size; private static final int consumerThreadCount = Config.async_task_consumer_thread_num; @@ -74,7 +74,7 @@ public class TaskDisruptor implements Closeable { public void start() { CustomThreadFactory exportTaskThreadFactory = new CustomThreadFactory("export-task-consumer"); disruptor = new Disruptor<>(TaskEvent.FACTORY, DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory, - ProducerType.SINGLE, new BlockingWaitStrategy()); + ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS)); WorkHandler[] workers = new TaskHandler[consumerThreadCount]; for (int i = 0; i < consumerThreadCount; i++) { workers[i] = new TaskHandler(); @@ -109,7 +109,7 @@ public class TaskDisruptor implements Closeable { try { disruptor.publishEvent(TRANSLATOR, jobId, taskId, taskType); } catch (Exception e) { - log.error("tryPublish failed, jobId: {}", jobId, e); + log.warn("tryPublish failed, jobId: {}", jobId, e); } } @@ -127,7 +127,7 @@ public class TaskDisruptor implements Closeable { try { disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK); } catch (Exception e) { - log.error("tryPublish failed, taskId: {}", taskId, e); + log.warn("tryPublish failed, taskId: {}", taskId, e); } }