[fix](routine load) add read lock to fix some concurrent bugs (#39242) (#39525)

pick #39242
This commit is contained in:
hui lai
2024-08-19 21:18:27 +08:00
committed by GitHub
parent fb17f204d7
commit a1aa9b8ab9

View File

@ -1566,24 +1566,28 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
public List<List<String>> getTasksShowInfo() throws AnalysisException {
List<List<String>> rows = Lists.newArrayList();
if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) {
return rows;
}
routineLoadTaskInfoList.forEach(entity -> {
long txnId = entity.getTxnId();
if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
readLock();
try {
if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) {
return rows;
}
routineLoadTaskInfoList.forEach(entity -> {
long txnId = entity.getTxnId();
if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
rows.add(entity.getTaskShowInfo());
return;
}
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, entity.getTxnId());
if (null != transactionState && null != transactionState.getTransactionStatus()) {
entity.setTxnStatus(transactionState.getTransactionStatus());
}
rows.add(entity.getTaskShowInfo());
return;
}
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, entity.getTxnId());
if (null != transactionState && null != transactionState.getTransactionStatus()) {
entity.setTxnStatus(transactionState.getTransactionStatus());
}
rows.add(entity.getTaskShowInfo());
});
return rows;
});
return rows;
} finally {
readUnlock();
}
}
public String getShowCreateInfo() {
@ -1699,12 +1703,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private String getTaskStatistic() {
Map<String, String> result = Maps.newHashMap();
result.put("running_task",
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count()));
result.put("waiting_task",
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count()));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(result);
readLock();
try {
result.put("running_task",
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count()));
result.put("waiting_task",
String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count()));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(result);
} finally {
readUnlock();
}
}
private String jobPropertiesToJsonString() {