[Bug] Fix bug that routine load job may cause dead lock (#6058)

To make source the routine load job's lock must be released after txn aborted
This commit is contained in:
Mingyu Chen
2021-06-20 16:14:47 +08:00
committed by GitHub
parent fe0912f6e5
commit 1d796d9aa4
2 changed files with 21 additions and 11 deletions

View File

@ -511,10 +511,10 @@ public class RoutineLoadManager implements Writable {
routineLoadJob.getState());
Catalog.getCurrentCatalog().getEditLog().logRemoveRoutineLoadJob(operation);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
.add("job_state", routineLoadJob.getState())
.add("msg", "old job has been cleaned")
.add("end_timestamp", routineLoadJob.getEndTimestamp())
.add("current_timestamp", currentTimestamp)
.add("job_state", routineLoadJob.getState())
.add("msg", "old job has been cleaned")
);
}
}

View File

@ -186,14 +186,24 @@ public class TransactionState implements Writable {
// error replica ids
private Set<Long> errorReplicas;
private CountDownLatch latch;
// this state need not to be serialized
private Map<Long, PublishVersionTask> publishVersionTasks;
private boolean hasSendTask;
private long publishVersionTime = -1;
private TransactionStatus preStatus = null;
private long callbackId = -1;
// In the beforeStateTransform() phase, we will get the callback object through the callbackId,
// and if we get it, we will save it in this variable.
// The main function of this variable is to retain a reference to this callback object.
// In order to prevent in the afterStateTransform() phase the callback object may have been removed
// from the CallbackFactory, resulting in the inability to obtain the object, causing some bugs
// such as
// 1. the write lock of callback object has been called in beforeStateTransform()
// 2. callback object has been removed from CallbackFactory
// 3. in afterStateTransform(), callback object can not be found, so the write lock can not be released.
private TxnStateChangeCallback callback = null;
private long timeoutMs = Config.stream_load_default_timeout_second;
// is set to true, we will double the publish timeout
@ -201,7 +211,7 @@ public class TransactionState implements Writable {
// optional
private TxnCommitAttachment txnCommitAttachment;
// this map should be set when load execution begin, so that when the txn commit, it will know
// which tables and rollups it loaded.
// tbl id -> (index ids)
@ -366,8 +376,7 @@ public class TransactionState implements Writable {
public void beforeStateTransform(TransactionStatus transactionStatus) throws TransactionException {
// before status changed
TxnStateChangeCallback callback = Catalog.getCurrentGlobalTransactionMgr()
.getCallbackFactory().getCallback(callbackId);
callback = Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId);
if (callback != null) {
switch (transactionStatus) {
case ABORTED:
@ -398,8 +407,9 @@ public class TransactionState implements Writable {
public void afterStateTransform(TransactionStatus transactionStatus, boolean txnOperated, String txnStatusChangeReason)
throws UserException {
// after status changed
TxnStateChangeCallback callback = Catalog.getCurrentGlobalTransactionMgr()
.getCallbackFactory().getCallback(callbackId);
if (callback == null) {
callback = Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().getCallback(callbackId);
}
if (callback != null) {
switch (transactionStatus) {
case ABORTED: