(Fix)(RoutineLoad)Query the transaction status NPE when the task has not yet started scheduling (#25074)

This commit is contained in:
Calvin Kirs
2023-10-07 20:26:49 +08:00
committed by GitHub
parent b380b8b0b5
commit f3e95608cb
4 changed files with 25 additions and 10 deletions

View File

@ -57,7 +57,7 @@ DataSourceProperties: {"0":19}
- `TaskId`: The unique ID of the subtask.
- `TxnId`: The import transaction ID corresponding to the subtask.
- `TxnStatus`: The import transaction status corresponding to the subtask. Usually UNKNOWN. No real meaning.
- `TxnStatus`: The import transaction status corresponding to the subtask. When TxnStatus is null, it means that the subtask has not yet started scheduling.
- `JobId`: The job ID corresponding to the subtask.
- `CreateTime`: The creation time of the subtask.
- `ExecuteStartTime`: The time when the subtask is scheduled to be executed, usually later than the creation time.

View File

@ -55,7 +55,7 @@ DataSourceProperties: {"0":19}
- `TaskId`:子任务的唯一 ID。
- `TxnId`:子任务对应的导入事务 ID。
- `TxnStatus`:子任务对应的导入事务状态。通常为 UNKNOWN。并无实际意思
- `TxnStatus`:子任务对应的导入事务状态。为 null 时表示子任务还未开始调度
- `JobId`:子任务对应的作业 ID。
- `CreateTime`:子任务的创建时间。
- `ExecuteStartTime`:子任务被调度执行的时间,通常晚于创建时间。

View File

@ -64,6 +64,7 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
@ -1440,16 +1441,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
public List<List<String>> getTasksShowInfo() {
public List<List<String>> getTasksShowInfo() throws AnalysisException {
List<List<String>> rows = Lists.newArrayList();
if (null == routineLoadTaskInfoList || routineLoadTaskInfoList.isEmpty()) {
return rows;
}
DatabaseTransactionMgr databaseTransactionMgr = Env.getCurrentEnv().getGlobalTransactionMgr()
.getDatabaseTransactionMgr(dbId);
routineLoadTaskInfoList.forEach(entity -> {
try {
entity.setTxnStatus(Env.getCurrentEnv().getGlobalTransactionMgr().getDatabaseTransactionMgr(dbId)
.getTransactionState(entity.getTxnId()).getTransactionStatus());
long txnId = entity.getTxnId();
if (RoutineLoadTaskInfo.INIT_TXN_ID == txnId) {
rows.add(entity.getTaskShowInfo());
} catch (AnalysisException e) {
LOG.warn("failed to setTxnStatus db: {}, txnId: {}, err: {}", dbId, entity.getTxnId(), e.getMessage());
return;
}
TransactionState transactionState = databaseTransactionMgr.getTransactionState(entity.getTxnId());
if (null != transactionState && null != transactionState.getTransactionStatus()) {
entity.setTxnStatus(transactionState.getTransactionStatus());
}
rows.add(entity.getTaskShowInfo());
});
return rows;
}

View File

@ -54,7 +54,8 @@ public abstract class RoutineLoadTaskInfo {
private RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager();
protected UUID id;
protected long txnId = -1L;
protected static final long INIT_TXN_ID = -1L;
protected long txnId = INIT_TXN_ID;
protected long jobId;
protected String clusterName;
@ -198,7 +199,11 @@ public abstract class RoutineLoadTaskInfo {
List<String> row = Lists.newArrayList();
row.add(DebugUtil.printId(id));
row.add(String.valueOf(txnId));
row.add(txnStatus.name());
if (INIT_TXN_ID != txnId) {
row.add(txnStatus.name());
} else {
row.add(null);
}
row.add(String.valueOf(jobId));
row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs)));
row.add(String.valueOf(TimeUtils.longToTimeString(executeStartTimeMs)));