[fix](routine-load) add lock to avoid editlog out of order when concurrent update job (#31095)
This commit is contained in:
committed by
yiguolei
parent
7a1bd6abb0
commit
8f70c00a26
@ -307,12 +307,23 @@ public class RoutineLoadManager implements Writable {
|
||||
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
|
||||
throws UserException {
|
||||
List<RoutineLoadJob> jobs = Lists.newArrayList();
|
||||
if (pauseRoutineLoadStmt.isAll()) {
|
||||
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
|
||||
} else {
|
||||
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
|
||||
pauseRoutineLoadStmt.getName());
|
||||
jobs.add(routineLoadJob);
|
||||
// it needs lock when getting routine load job,
|
||||
// otherwise, it may cause the editLog out of order in the following scenarios:
|
||||
// thread A: create job and record job meta
|
||||
// thread B: change job state and persist in editlog according to meta
|
||||
// thread A: persist in editlog
|
||||
// which will cause the null pointer exception when replaying editLog
|
||||
readLock();
|
||||
try {
|
||||
if (pauseRoutineLoadStmt.isAll()) {
|
||||
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
|
||||
} else {
|
||||
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
|
||||
pauseRoutineLoadStmt.getName());
|
||||
jobs.add(routineLoadJob);
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
for (RoutineLoadJob routineLoadJob : jobs) {
|
||||
@ -373,8 +384,20 @@ public class RoutineLoadManager implements Writable {
|
||||
|
||||
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
|
||||
throws UserException {
|
||||
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
|
||||
stopRoutineLoadStmt.getName());
|
||||
RoutineLoadJob routineLoadJob;
|
||||
// it needs lock when getting routine load job,
|
||||
// otherwise, it may cause the editLog out of order in the following scenarios:
|
||||
// thread A: create job and record job meta
|
||||
// thread B: change job state and persist in editlog according to meta
|
||||
// thread A: persist in editlog
|
||||
// which will cause the null pointer exception when replaying editLog
|
||||
readLock();
|
||||
try {
|
||||
routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
|
||||
stopRoutineLoadStmt.getName());
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
|
||||
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
|
||||
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
|
||||
@ -796,6 +819,9 @@ public class RoutineLoadManager implements Writable {
|
||||
job.updateState(operation.getJobState(), null, true /* is replay */);
|
||||
} catch (UserException e) {
|
||||
LOG.error("should not happened", e);
|
||||
} catch (NullPointerException npe) {
|
||||
LOG.error("cannot get job when replaying state change job, which is unexpected, job id: "
|
||||
+ operation.getId());
|
||||
}
|
||||
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, operation.getId())
|
||||
.add("current_state", operation.getJobState())
|
||||
@ -807,7 +833,19 @@ public class RoutineLoadManager implements Writable {
|
||||
* Enter of altering a routine load job
|
||||
*/
|
||||
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException {
|
||||
RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
|
||||
RoutineLoadJob job;
|
||||
// it needs lock when getting routine load job,
|
||||
// otherwise, it may cause the editLog out of order in the following scenarios:
|
||||
// thread A: create job and record job meta
|
||||
// thread B: change job state and persist in editlog according to meta
|
||||
// thread A: persist in editlog
|
||||
// which will cause the null pointer exception when replaying editLog
|
||||
readLock();
|
||||
try {
|
||||
job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
if (stmt.hasDataSourceProperty()
|
||||
&& !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) {
|
||||
throw new DdlException("The specified job type is not: "
|
||||
|
||||
Reference in New Issue
Block a user