diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0a947701ef..889d240ce2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1142,11 +1142,24 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (txnStatusChangeReasonString != null) { txnStatusChangeReason = TransactionState.TxnStatusChangeReason.fromString(txnStatusChangeReasonString); + String msg; if (txnStatusChangeReason != null) { switch (txnStatusChangeReason) { case OFFSET_OUT_OF_RANGE: + msg = "be " + taskBeId + " abort task," + + " task id: " + routineLoadTaskInfo.getId() + + " job id: " + routineLoadTaskInfo.getJobId() + + " with reason: " + txnStatusChangeReasonString + + " the offset used by job does not exist in kafka," + + " please check the offset," + + " using the Alter ROUTINE LOAD command to modify it," + + " and resume the job"; + updateState(JobState.PAUSED, + new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg), + false /* not replay */); + return; case PAUSE: - String msg = "be " + taskBeId + " abort task " + msg = "be " + taskBeId + " abort task " + "with reason: " + txnStatusChangeReasonString; updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),