[branch-2.1](routine-load) optimize routine load job auto resume policy (#37373)
pick #35266
This commit is contained in:
@ -1257,7 +1257,7 @@ public class Config extends ConfigBase {
|
||||
* a period for auto resume routine load
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int period_of_auto_resume_min = 5;
|
||||
public static int period_of_auto_resume_min = 10;
|
||||
|
||||
/**
|
||||
* If set to true, the backend will be automatically dropped after finishing decommission.
|
||||
|
||||
@ -224,9 +224,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
protected int currentTaskConcurrentNum;
|
||||
protected RoutineLoadProgress progress;
|
||||
|
||||
protected long firstResumeTimestamp; // the first resume time
|
||||
protected long latestResumeTimestamp; // the latest resume time
|
||||
protected long autoResumeCount;
|
||||
protected boolean autoResumeLock = false; //it can't auto resume iff true
|
||||
// some other msg which need to show to user;
|
||||
protected String otherMsg = "";
|
||||
protected ErrorReason pauseReason;
|
||||
|
||||
@ -367,8 +367,7 @@ public class RoutineLoadManager implements Writable {
|
||||
try {
|
||||
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
|
||||
routineLoadJob.autoResumeCount = 0;
|
||||
routineLoadJob.firstResumeTimestamp = 0;
|
||||
routineLoadJob.autoResumeLock = false;
|
||||
routineLoadJob.latestResumeTimestamp = 0;
|
||||
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
|
||||
.add("current_state", routineLoadJob.getState())
|
||||
|
||||
@ -31,6 +31,10 @@ import org.apache.logging.log4j.Logger;
|
||||
public class ScheduleRule {
|
||||
private static final Logger LOG = LogManager.getLogger(ScheduleRule.class);
|
||||
|
||||
private static final long BACK_OFF_BASIC_TIME_SEC = 10L;
|
||||
|
||||
private static final long MAX_BACK_OFF_TIME_SEC = 60 * 5;
|
||||
|
||||
private static int deadBeCount() {
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
int total = systemInfoService.getAllBackendIds(false).size();
|
||||
@ -47,22 +51,10 @@ public class ScheduleRule {
|
||||
if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
|
||||
return false;
|
||||
}
|
||||
if (jobRoutine.autoResumeLock) { //only manual resume for unlock
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("routine load job {}'s autoResumeLock is true, skip", jobRoutine.id);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Handle all backends are down.
|
||||
*/
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("try to auto reschedule routine load {}, firstResumeTimestamp: {}, autoResumeCount: {}, "
|
||||
+ "pause reason: {}",
|
||||
jobRoutine.id, jobRoutine.firstResumeTimestamp, jobRoutine.autoResumeCount,
|
||||
jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name());
|
||||
}
|
||||
if (jobRoutine.pauseReason != null
|
||||
&& jobRoutine.pauseReason.getCode() != InternalErrorCode.MANUAL_PAUSE_ERR
|
||||
&& jobRoutine.pauseReason.getCode() != InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR
|
||||
@ -77,19 +69,25 @@ public class ScheduleRule {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (jobRoutine.firstResumeTimestamp == 0) { //the first resume
|
||||
jobRoutine.firstResumeTimestamp = System.currentTimeMillis();
|
||||
if (jobRoutine.latestResumeTimestamp == 0) { //the first resume
|
||||
jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
|
||||
jobRoutine.autoResumeCount = 1;
|
||||
return true;
|
||||
} else {
|
||||
long current = System.currentTimeMillis();
|
||||
if (current - jobRoutine.firstResumeTimestamp < Config.period_of_auto_resume_min * 60000L) {
|
||||
if (jobRoutine.autoResumeCount >= 3) {
|
||||
jobRoutine.autoResumeLock = true; // locked Auto Resume RoutineLoadJob
|
||||
return false;
|
||||
if (current - jobRoutine.latestResumeTimestamp < Config.period_of_auto_resume_min * 60000L) {
|
||||
long autoResumeIntervalTimeSec =
|
||||
Math.min((long) Math.pow(2, jobRoutine.autoResumeCount) * BACK_OFF_BASIC_TIME_SEC,
|
||||
MAX_BACK_OFF_TIME_SEC);
|
||||
if (current - jobRoutine.latestResumeTimestamp > autoResumeIntervalTimeSec * 1000L) {
|
||||
LOG.info("try to auto reschedule routine load {}, latestResumeTimestamp: {},"
|
||||
+ " autoResumeCount: {}, pause reason: {}",
|
||||
jobRoutine.id, jobRoutine.latestResumeTimestamp, jobRoutine.autoResumeCount,
|
||||
jobRoutine.pauseReason == null ? "null" : jobRoutine.pauseReason.getCode().name());
|
||||
jobRoutine.latestResumeTimestamp = System.currentTimeMillis();
|
||||
jobRoutine.autoResumeCount++;
|
||||
return true;
|
||||
}
|
||||
jobRoutine.autoResumeCount++;
|
||||
return true;
|
||||
} else {
|
||||
/**
|
||||
* for example:
|
||||
@ -98,7 +96,7 @@ public class ScheduleRule {
|
||||
* the third resume time at 10:20
|
||||
* --> we must be reset counter because a new period for AutoResume RoutineLoadJob
|
||||
*/
|
||||
jobRoutine.firstResumeTimestamp = current;
|
||||
jobRoutine.latestResumeTimestamp = current;
|
||||
jobRoutine.autoResumeCount = 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -638,21 +638,20 @@ public class RoutineLoadManagerTest {
|
||||
|
||||
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
|
||||
|
||||
// 第一次自动恢复
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Deencapsulation.setField(routineLoadJob, "pauseReason",
|
||||
new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, ""));
|
||||
try {
|
||||
Thread.sleep(((long) Math.pow(2, i) * 10 * 1000L));
|
||||
} catch (InterruptedException e) {
|
||||
throw new UserException("thread sleep failed");
|
||||
}
|
||||
routineLoadManager.updateRoutineLoadJob();
|
||||
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState());
|
||||
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
|
||||
boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock");
|
||||
Assert.assertEquals(autoResumeLock, false);
|
||||
}
|
||||
// 第四次自动恢复 就会锁定
|
||||
routineLoadManager.updateRoutineLoadJob();
|
||||
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
|
||||
boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock");
|
||||
Assert.assertEquals(autoResumeLock, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user