diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 7eb7806f51..356262f8c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -488,6 +488,7 @@ public class RoutineLoadManager implements Writable { readLock(); try { Map beIdToConcurrentTasks = getBeCurrentTasksNumMap(); + int previousBeIdleTaskNum = 0; // 1. Find if the given BE id has more than half of available slots if (previousBeId != -1L && availableBeIds.contains(previousBeId)) { @@ -495,22 +496,22 @@ public class RoutineLoadManager implements Writable { Backend previousBackend = Env.getCurrentSystemInfo().getBackend(previousBeId); // check previousBackend is not null && load available if (previousBackend != null && previousBackend.isLoadAvailable()) { - int idleTaskNum = 0; if (!beIdToMaxConcurrentTasks.containsKey(previousBeId)) { - idleTaskNum = 0; + previousBeIdleTaskNum = 0; } else if (beIdToConcurrentTasks.containsKey(previousBeId)) { - idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) + previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId) - beIdToConcurrentTasks.get(previousBeId); } else { - idleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); + previousBeIdleTaskNum = beIdToMaxConcurrentTasks.get(previousBeId); } - if (idleTaskNum > (Config.max_routine_load_task_num_per_be >> 1)) { + if (previousBeIdleTaskNum == Config.max_routine_load_task_num_per_be) { return previousBeId; } } } - // 2. The given BE id does not have available slots, find a BE with min tasks + // 2. we believe that the benefits of load balance outweigh the benefits of object pool cache, + // so we try to find the one with the most idle slots as much as possible // 3. The previous BE is not in cluster && is not load available, find a new BE with min tasks int idleTaskNum = 0; long resultBeId = -1L; @@ -530,6 +531,11 @@ public class RoutineLoadManager implements Writable { maxIdleSlotNum = Math.max(maxIdleSlotNum, idleTaskNum); } } + // 4. on the basis of selecting the maximum idle slot be, + // try to reuse the object cache as much as possible + if (previousBeIdleTaskNum == maxIdleSlotNum) { + return previousBeId; + } return resultBeId; } finally { readUnlock();