[improve](routine_load) move log from write lock (#27576)
This commit is contained in:
committed by
GitHub
parent
d10a708fa2
commit
550f3e801d
@ -1240,21 +1240,27 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
}
|
||||
|
||||
public void updateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
|
||||
writeLock();
|
||||
try {
|
||||
unprotectUpdateState(jobState, reason, isReplay);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("desire_job_state", jobState)
|
||||
.add("msg", reason)
|
||||
.build());
|
||||
|
||||
writeLock();
|
||||
try {
|
||||
unprotectUpdateState(jobState, reason, isReplay);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("msg", "job state has been changed")
|
||||
.add("is replay", String.valueOf(isReplay))
|
||||
.build());
|
||||
}
|
||||
|
||||
protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
|
||||
checkStateTransform(jobState);
|
||||
switch (jobState) {
|
||||
case RUNNING:
|
||||
@ -1283,11 +1289,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
if (!isReplay && jobState != JobState.RUNNING) {
|
||||
Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState));
|
||||
}
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("msg", "job state has been changed")
|
||||
.add("is replay", String.valueOf(isReplay))
|
||||
.build());
|
||||
}
|
||||
|
||||
private void executeRunning() {
|
||||
@ -1331,9 +1332,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
writeLock();
|
||||
try {
|
||||
if (!state.isFinalState()) {
|
||||
unprotectUpdateState(JobState.CANCELLED,
|
||||
new ErrorReason(InternalErrorCode.DB_ERR, "db " + dbId + "not exist"),
|
||||
false /* not replay */);
|
||||
ErrorReason reason = new ErrorReason(InternalErrorCode.DB_ERR, "db " + dbId + "not exist");
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("desire_job_state", JobState.CANCELLED)
|
||||
.add("msg", reason)
|
||||
.build());
|
||||
unprotectUpdateState(JobState.CANCELLED, reason, false /* not replay */);
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("msg", "job state has been changed")
|
||||
.add("is replay", "false")
|
||||
.build());
|
||||
}
|
||||
return;
|
||||
} finally {
|
||||
@ -1350,8 +1360,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
writeLock();
|
||||
try {
|
||||
if (!state.isFinalState()) {
|
||||
unprotectUpdateState(JobState.CANCELLED, new ErrorReason(InternalErrorCode.TABLE_ERR,
|
||||
"table does not exist"), false /* not replay */);
|
||||
ErrorReason reason = new ErrorReason(InternalErrorCode.TABLE_ERR, "table does not exist");
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("desire_job_state", JobState.CANCELLED)
|
||||
.add("msg", reason)
|
||||
.build());
|
||||
unprotectUpdateState(JobState.CANCELLED, reason, false /* not replay */);
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("current_job_state", getState())
|
||||
.add("msg", "job state has been changed")
|
||||
.add("is replay", "false")
|
||||
.build());
|
||||
}
|
||||
return;
|
||||
} finally {
|
||||
|
||||
@ -719,10 +719,10 @@ public class RoutineLoadManager implements Writable {
|
||||
if (job != null) {
|
||||
unprotectedRemoveJobFromDb(job);
|
||||
}
|
||||
LOG.info("replay remove routine load job: {}", operation.getId());
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
LOG.info("replay remove routine load job: {}", operation.getId());
|
||||
}
|
||||
|
||||
private void unprotectedRemoveJobFromDb(RoutineLoadJob routineLoadJob) {
|
||||
|
||||
Reference in New Issue
Block a user