[Fix](Job)TVF Query JOB Concurrent Reading and Writing Causes Exception (#31248)

This commit is contained in:
Calvin Kirs
2024-02-22 14:02:17 +08:00
committed by yiguolei
parent e3b4b83bca
commit 7647cec509

View File

@ -50,6 +50,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -128,7 +129,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
this.executeSql = executeSql;
}
private List<T> runningTasks = new ArrayList<>();
private CopyOnWriteArrayList<T> runningTasks = new CopyOnWriteArrayList<>();
private Lock createTaskLock = new ReentrantLock();
@ -140,7 +141,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
for (T task : runningTasks) {
task.cancel();
}
runningTasks = new ArrayList<>();
runningTasks = new CopyOnWriteArrayList<>();
}
private static final ImmutableList<String> TITLE_NAMES =
@ -270,7 +271,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
public static AbstractJob readFields(DataInput in) throws IOException {
String jsonJob = Text.readString(in);
AbstractJob job = GsonUtils.GSON.fromJson(jsonJob, AbstractJob.class);
job.runningTasks = new ArrayList<>();
job.runningTasks = new CopyOnWriteArrayList();
job.createTaskLock = new ReentrantLock();
return job;
}