[fix](export) fix concurrent modification issue with export job #43051 (#43052)

bp #43051
This commit is contained in:
Mingyu Chen (Rayner)
2024-11-01 17:19:29 +08:00
committed by GitHub
parent 355170a921
commit cb25c40ddf

View File

@ -67,8 +67,8 @@ public class ExportMgr {
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId = Maps.newHashMap();
// lock for export job
// lock is private and must use after db lock
// lock for protecting export jobs.
// need to be added when creating or cancelling export job.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public ExportMgr() {
@ -137,6 +137,11 @@ public class ExportMgr {
// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, stmt.getDbName(), matchExportJobs);
// Must add lock to protect export job.
// Because job may be cancelled when generating task executors,
// the cancel process may clear the task executor list at same time,
// which will cause ConcurrentModificationException
writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
@ -145,6 +150,8 @@ public class ExportMgr {
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
} finally {
writeUnlock();
}
}
@ -459,7 +466,7 @@ public class ExportMgr {
}
public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
readLock();
writeLock();
try {
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
job.replayExportJobState(stateTransfer.getState());
@ -468,7 +475,7 @@ public class ExportMgr {
job.setFailMsg(stateTransfer.getFailMsg());
job.setOutfileInfo(stateTransfer.getOutFileInfo());
} finally {
readUnlock();
writeUnlock();
}
}