From b6ca76e7d44411a3b888bd021e2d7a60b9348ecf Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Fri, 23 Feb 2024 20:55:44 +0800 Subject: [PATCH] fix routine load job throw exception after commit (#31303) --- .../apache/doris/load/routineload/RoutineLoadJob.java | 10 ++++++++++ .../doris/load/routineload/RoutineLoadJobTest.java | 1 - 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a8fb467a9b..8760dc4b71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1018,6 +1018,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + if (!routineLoadTaskInfoOptional.isPresent()) { + // not find task in routineLoadTaskInfoList. this may happen in following case: + // the routine load job has been paused and before transaction committed. + // The routineLoadTaskInfoList will be cleared when job being paused. + // So the task can not be found here. + // This is a normal case, we just print a log here to observe. + LOG.info("Can not find task with transaction {} after committed, job: {}", + txnState.getTransactionId(), id); + return; + } RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED, null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index df7f363381..8d90395745 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -172,7 +172,6 @@ public class RoutineLoadJobTest { Deencapsulation.setField(routineLoadJob, "progress", progress); try { routineLoadJob.afterCommitted(transactionState, true); - Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); } catch (TransactionException e) { Assert.fail(); }