From f3e95608cb37e7bed31591a42c31fc55dbfdaf24 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Sat, 7 Oct 2023 20:26:49 +0800 Subject: [PATCH] (Fix)(RoutineLoad)Query the transaction status NPE when the task has not yet started scheduling (#25074) --- .../Show-Statements/SHOW-ROUTINE-LOAD-TASK.md | 2 +- .../Show-Statements/SHOW-ROUTINE-LOAD-TASK.md | 2 +- .../load/routineload/RoutineLoadJob.java | 22 ++++++++++++++----- .../load/routineload/RoutineLoadTaskInfo.java | 9 ++++++-- 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md index f144ff30a2..83411cab58 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md @@ -57,7 +57,7 @@ DataSourceProperties: {"0":19} - `TaskId`: The unique ID of the subtask. - `TxnId`: The import transaction ID corresponding to the subtask. -- `TxnStatus`: The import transaction status corresponding to the subtask. Usually UNKNOWN. No real meaning. +- `TxnStatus`: The import transaction status corresponding to the subtask. When TxnStatus is null, it means that the subtask has not yet started scheduling. - `JobId`: The job ID corresponding to the subtask. - `CreateTime`: The creation time of the subtask. - `ExecuteStartTime`: The time when the subtask is scheduled to be executed, usually later than the creation time. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md index c8a7c0f5bd..446b43a779 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md @@ -55,7 +55,7 @@ DataSourceProperties: {"0":19} - `TaskId`:子任务的唯一 ID。 - `TxnId`:子任务对应的导入事务 ID。 -- `TxnStatus`:子任务对应的导入事务状态。通常为 UNKNOWN。并无实际意思。 +- `TxnStatus`:子任务对应的导入事务状态。为 null 时表示子任务还未开始调度。 - `JobId`:子任务对应的作业 ID。 - `CreateTime`:子任务的创建时间。 - `ExecuteStartTime`:子任务被调度执行的时间,通常晚于创建时间。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 46183588df..efa498c56b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -64,6 +64,7 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPipelineFragmentParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; +import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -1440,16 +1441,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } } - public List> getTasksShowInfo() { + public List> getTasksShowInfo() throws AnalysisException { List> rows = Lists.newArrayList(); + if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) { + return rows; + } + DatabaseTransactionMgr databaseTransactionMgr = Env.getCurrentEnv().getGlobalTransactionMgr() + .getDatabaseTransactionMgr(dbId); + routineLoadTaskInfoList.forEach(entity -> { - try { - entity.setTxnStatus(Env.getCurrentEnv().getGlobalTransactionMgr().getDatabaseTransactionMgr(dbId) - .getTransactionState(entity.getTxnId()).getTransactionStatus()); + long txnId = entity.getTxnId(); + if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) { rows.add(entity.getTaskShowInfo()); - } catch (AnalysisException e) { - LOG.warn("failed to setTxnStatus db: {}, txnId: {}, err: {}", dbId, entity.getTxnId(), e.getMessage()); + return; } + TransactionState transactionState = databaseTransactionMgr.getTransactionState(entity.getTxnId()); + if (null != transactionState && null != transactionState.getTransactionStatus()) { + entity.setTxnStatus(transactionState.getTransactionStatus()); + } + rows.add(entity.getTaskShowInfo()); }); return rows; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 77758a72d6..f80e377dac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -54,7 +54,8 @@ public abstract class RoutineLoadTaskInfo { private RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager(); protected UUID id; - protected long txnId = -1L; + protected static final long INIT_TXN_ID = -1L; + protected long txnId = INIT_TXN_ID; protected long jobId; protected String clusterName; @@ -198,7 +199,11 @@ public abstract class RoutineLoadTaskInfo { List row = Lists.newArrayList(); row.add(DebugUtil.printId(id)); row.add(String.valueOf(txnId)); - row.add(txnStatus.name()); + if (INIT_TXN_ID != txnId) { + row.add(txnStatus.name()); + } else { + row.add(null); + } row.add(String.valueOf(jobId)); row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs))); row.add(String.valueOf(TimeUtils.longToTimeString(executeStartTimeMs)));