branch-2.1: [Fix](job)Fix CAS competition failure leading to message publishing failure. #45018 (#45030)

Cherry-picked from #45018

Co-authored-by: Calvin Kirs <guoqiang@selectdb.com>
This commit is contained in:
github-actions[bot]
2024-12-06 09:32:15 +08:00
committed by GitHub
parent 041a1fe095
commit a67bbf3c58
2 changed files with 13 additions and 2 deletions

View File

@ -74,7 +74,16 @@ public class TaskDisruptor<T> {
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
return ringBuffer.tryPublishEvent(eventTranslator, args);
// Check if the RingBuffer has enough capacity to reserve 10 slots for tasks
// If there is insufficient capacity (less than 10 slots available)
// log a warning and drop the current task
if (!ringBuffer.hasAvailableCapacity(10)) {
LOG.warn("ring buffer has no available capacity,task will be dropped,"
+ "please check the task queue size.");
return false;
}
ringBuffer.publishEvent(eventTranslator, args);
return true;
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert

View File

@ -168,7 +168,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(),
job.getJobConfig())) {
throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId());
task.onFail(errorMsg);
throw new JobException(errorMsg);
}
log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(),