diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index d89bde719d..13386842de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -511,10 +511,10 @@ public class RoutineLoadManager implements Writable { routineLoadJob.getState()); Catalog.getCurrentCatalog().getEditLog().logRemoveRoutineLoadJob(operation); LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()) - .add("end_timestamp", routineLoadJob.getEndTimestamp()) - .add("current_timestamp", currentTimestamp) - .add("job_state", routineLoadJob.getState()) - .add("msg", "old job has been cleaned") + .add("end_timestamp", routineLoadJob.getEndTimestamp()) + .add("current_timestamp", currentTimestamp) + .add("job_state", routineLoadJob.getState()) + .add("msg", "old job has been cleaned") ); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 40c8cfc40b..23c8053969 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -186,14 +186,24 @@ public class TransactionState implements Writable { // error replica ids private Set errorReplicas; private CountDownLatch latch; - + // this state need not to be serialized private Map publishVersionTasks; private boolean hasSendTask; private long publishVersionTime = -1; private TransactionStatus preStatus = null; - + private long callbackId = -1; + // In the beforeStateTransform() phase, we will get the callback object through the callbackId, + // and if we get it, we will save it in this variable. + // The main function of this variable is to retain a reference to this callback object. + // In order to prevent in the afterStateTransform() phase the callback object may have been removed + // from the CallbackFactory, resulting in the inability to obtain the object, causing some bugs + // such as + // 1. the write lock of callback object has been called in beforeStateTransform() + // 2. callback object has been removed from CallbackFactory + // 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released. + private TxnStateChangeCallback callback = null; private long timeoutMs = Config.stream_load_default_timeout_second; // is set to true, we will double the publish timeout @@ -201,7 +211,7 @@ public class TransactionState implements Writable { // optional private TxnCommitAttachment txnCommitAttachment; - + // this map should be set when load execution begin, so that when the txn commit, it will know // which tables and rollups it loaded. // tbl id -> (index ids) @@ -366,8 +376,7 @@ public class TransactionState implements Writable { public void beforeStateTransform(TransactionStatus transactionStatus) throws TransactionException { // before status changed - TxnStateChangeCallback callback = Catalog.getCurrentGlobalTransactionMgr() - .getCallbackFactory().getCallback(callbackId); + callback = Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId); if (callback != null) { switch (transactionStatus) { case ABORTED: @@ -398,8 +407,9 @@ public class TransactionState implements Writable { public void afterStateTransform(TransactionStatus transactionStatus, boolean txnOperated, String txnStatusChangeReason) throws UserException { // after status changed - TxnStateChangeCallback callback = Catalog.getCurrentGlobalTransactionMgr() - .getCallbackFactory().getCallback(callbackId); + if (callback == null) { + callback = Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId); + } if (callback != null) { switch (transactionStatus) { case ABORTED: