fix total task exec time is far more than actual (#31273)

This commit is contained in:
HHoflittlefish777
2024-02-29 14:57:52 +08:00
committed by yiguolei
parent 9c4708ee74
commit df4b289825
2 changed files with 9 additions and 11 deletions

View File

@ -38,12 +38,11 @@ import org.apache.doris.thrift.TStatusCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
/**
* Routine load task scheduler is a function which allocate task to be.
@ -61,7 +60,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
private RoutineLoadManager routineLoadManager;
private LinkedBlockingQueue<RoutineLoadTaskInfo> needScheduleTasksQueue = Queues.newLinkedBlockingQueue();
private LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue = new LinkedBlockingDeque<>();
private long lastBackendSlotUpdateTime = -1;
@ -105,7 +104,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime()
< routineLoadTaskInfo.getTimeoutMs()) {
// try to delay scheduling this task for 'timeout', to void too many failure
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
scheduleOneTask(routineLoadTaskInfo);
@ -133,7 +132,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
try {
// check if topic has more data to consume
if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTasksQueue.addLast(routineLoadTaskInfo);
return;
}
@ -141,7 +140,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// this should be done before txn begin, or the txn may be begun successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next scheduling
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (UserException e) {
@ -164,7 +163,7 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// begin txn failed. push it back to the queue to wait next scheduling
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
needScheduleTasksQueue.put(routineLoadTaskInfo);
needScheduleTasksQueue.addFirst(routineLoadTaskInfo);
return;
}
} catch (Exception e) {