From a1aa9b8ab9f242dc81784d7e15972085d53d2ce4 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Mon, 19 Aug 2024 21:18:27 +0800 Subject: [PATCH] [fix](routine load) add read lock to fix some concurrent bugs (#39242) (#39525) pick #39242 --- .../load/routineload/RoutineLoadJob.java | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 5ef531bb37..d3bd24a0b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1566,24 +1566,28 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl public List> getTasksShowInfo() throws AnalysisException { List> rows = Lists.newArrayList(); - if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) { - return rows; - } - - routineLoadTaskInfoList.forEach(entity -> { - long txnId = entity.getTxnId(); - if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) { + readLock(); + try { + if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) { + return rows; + } + routineLoadTaskInfoList.forEach(entity -> { + long txnId = entity.getTxnId(); + if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) { + rows.add(entity.getTaskShowInfo()); + return; + } + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(dbId, entity.getTxnId()); + if (null != transactionState && null != transactionState.getTransactionStatus()) { + entity.setTxnStatus(transactionState.getTransactionStatus()); + } rows.add(entity.getTaskShowInfo()); - return; - } - TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(dbId, entity.getTxnId()); - if (null != transactionState && null != transactionState.getTransactionStatus()) { - entity.setTxnStatus(transactionState.getTransactionStatus()); - } - rows.add(entity.getTaskShowInfo()); - }); - return rows; + }); + return rows; + } finally { + readUnlock(); + } } public String getShowCreateInfo() { @@ -1699,12 +1703,17 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private String getTaskStatistic() { Map result = Maps.newHashMap(); - result.put("running_task", - String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count())); - result.put("waiting_task", - String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count())); - Gson gson = new GsonBuilder().disableHtmlEscaping().create(); - return gson.toJson(result); + readLock(); + try { + result.put("running_task", + String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> entity.isRunning()).count())); + result.put("waiting_task", + String.valueOf(routineLoadTaskInfoList.stream().filter(entity -> !entity.isRunning()).count())); + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(result); + } finally { + readUnlock(); + } } private String jobPropertiesToJsonString() {