From 550f3e801d0494ffe9b362a185a5d435cf330b32 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:47:31 +0800 Subject: [PATCH] [improve](routine_load) move log from write lock (#27576) --- .../load/routineload/RoutineLoadJob.java | 58 +++++++++++++------ .../load/routineload/RoutineLoadManager.java | 2 +- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 299bdd88b6..c0e6d1c98f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1240,21 +1240,27 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } public void updateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException { - writeLock(); - try { - unprotectUpdateState(jobState, reason, isReplay); - } finally { - writeUnlock(); - } - } - - protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException { LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) .add("current_job_state", getState()) .add("desire_job_state", jobState) .add("msg", reason) .build()); + writeLock(); + try { + unprotectUpdateState(jobState, reason, isReplay); + } finally { + writeUnlock(); + } + + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .add("is replay", String.valueOf(isReplay)) + .build()); + } + + protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException { checkStateTransform(jobState); switch (jobState) { case RUNNING: @@ -1283,11 +1289,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (!isReplay && jobState != JobState.RUNNING) { Env.getCurrentEnv().getEditLog().logOpRoutineLoadJob(new RoutineLoadOperation(id, jobState)); } - LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_job_state", getState()) - .add("msg", "job state has been changed") - .add("is replay", String.valueOf(isReplay)) - .build()); } private void executeRunning() { @@ -1331,9 +1332,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl writeLock(); try { if (!state.isFinalState()) { - unprotectUpdateState(JobState.CANCELLED, - new ErrorReason(InternalErrorCode.DB_ERR, "db " + dbId + "not exist"), - false /* not replay */); + ErrorReason reason = new ErrorReason(InternalErrorCode.DB_ERR, "db " + dbId + "not exist"); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("desire_job_state", JobState.CANCELLED) + .add("msg", reason) + .build()); + unprotectUpdateState(JobState.CANCELLED, reason, false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .add("is replay", "false") + .build()); } return; } finally { @@ -1350,8 +1360,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl writeLock(); try { if (!state.isFinalState()) { - unprotectUpdateState(JobState.CANCELLED, new ErrorReason(InternalErrorCode.TABLE_ERR, - "table does not exist"), false /* not replay */); + ErrorReason reason = new ErrorReason(InternalErrorCode.TABLE_ERR, "table does not exist"); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("desire_job_state", JobState.CANCELLED) + .add("msg", reason) + .build()); + unprotectUpdateState(JobState.CANCELLED, reason, false /* not replay */); + LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_job_state", getState()) + .add("msg", "job state has been changed") + .add("is replay", "false") + .build()); } return; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index aa57f67e6f..6fb12e95df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -719,10 +719,10 @@ public class RoutineLoadManager implements Writable { if (job != null) { unprotectedRemoveJobFromDb(job); } - LOG.info("replay remove routine load job: {}", operation.getId()); } finally { writeUnlock(); } + LOG.info("replay remove routine load job: {}", operation.getId()); } private void unprotectedRemoveJobFromDb(RoutineLoadJob routineLoadJob) {