Fixed a bug that Load job's state is incorrect when upgrading from 0.10.x to 0.11.x (#2356)
There is bug in Doris version 0.10.x. When a load job in PENDING or LOADING state was replayed from image (not through the edit log), we forgot to add the corresponding callback id in the CallbackFactory. As a result, the subsequent finish txn edit logs cannot properly finish the job during the replay process. This results in that when the FE restarts, these load jobs that should have been completed are re-entered into the pending state, resulting in repeated submission load tasks. Those wrong images are unrecoverable, so that we have to reset all load jobs in PENDING or LOADING state when restarting FE, depends on its corresponding txn's status, to avoid submit jobs repeatedly. If corresponding txn exist, set load job' state depends on txn's status. If txn does not exist, may be the txn has been removed due to label expiration. So that we don't know the txn is aborted or visible. So we have to set the job's state as UNKNOWN, which need handle it manually.
This commit is contained in:
@ -203,7 +203,7 @@ public class HttpServer {
|
||||
.childHandler(new PaloHttpServerInitializer());
|
||||
Channel ch = serverBootstrap.bind(port).sync().channel();
|
||||
ch.closeFuture().sync();
|
||||
|
||||
LOG.info("HttpServer started with port: {}", port);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to start FE query http server[port: " + port + "] ", e);
|
||||
System.exit(-1);
|
||||
|
||||
@ -63,6 +63,7 @@ public class LoadJob implements Writable {
|
||||
// QUORUM_FINISHED state is internal
|
||||
// To user, it should be transformed to FINISHED
|
||||
public enum JobState {
|
||||
UNKNOWN, // only for show load state value check, details, see LoadJobV2's JobState
|
||||
PENDING,
|
||||
ETL,
|
||||
LOADING,
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2;
|
||||
|
||||
// JobState will be persisted in meta data by name, so the order of these state is not important
|
||||
public enum JobState {
|
||||
UNKNOWN, // this is only for ISSUE #2354
|
||||
PENDING, // init state
|
||||
LOADING, // job is running
|
||||
COMMITTED, // transaction is committed but not visible
|
||||
|
||||
@ -293,9 +293,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
return state == JobState.COMMITTED || state == JobState.FINISHED || state == JobState.CANCELLED;
|
||||
}
|
||||
|
||||
// return true if job is done(FINISHED/CANCELLED)
|
||||
// return true if job is done(FINISHED/CANCELLED/UNKNOWN)
|
||||
public boolean isCompleted() {
|
||||
return state == JobState.FINISHED || state == JobState.CANCELLED;
|
||||
return state == JobState.FINISHED || state == JobState.CANCELLED || state == JobState.UNKNOWN;
|
||||
}
|
||||
|
||||
protected void setJobProperties(Map<String, String> properties) throws DdlException {
|
||||
@ -430,6 +430,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
|
||||
protected void unprotectedUpdateState(JobState jobState) {
|
||||
switch (jobState) {
|
||||
case UNKNOWN:
|
||||
executeUnknown();
|
||||
break;
|
||||
case LOADING:
|
||||
executeLoad();
|
||||
break;
|
||||
@ -444,6 +447,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
}
|
||||
}
|
||||
|
||||
private void executeUnknown() {
|
||||
// set finished timestamp to create timestamp, so that this unknown job
|
||||
// can be remove due to label expiration so soon as possible
|
||||
finishTimestamp = createTimestamp;
|
||||
state = JobState.UNKNOWN;
|
||||
}
|
||||
|
||||
private void executeLoad() {
|
||||
loadStartTimestamp = System.currentTimeMillis();
|
||||
state = JobState.LOADING;
|
||||
@ -557,7 +567,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
|
||||
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
|
||||
.add("transaction_id", transactionId)
|
||||
.add("error_msg", "Failed to execute load with error " + failMsg.getMsg())
|
||||
.add("error_msg", "Failed to execute load with error: " + failMsg.getMsg())
|
||||
.build());
|
||||
|
||||
// clean the loadingStatus
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.FailMsg;
|
||||
import org.apache.doris.load.FailMsg.CancelType;
|
||||
import org.apache.doris.load.Load;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TMiniLoadBeginRequest;
|
||||
@ -42,6 +43,7 @@ import org.apache.doris.thrift.TMiniLoadRequest;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -596,18 +598,68 @@ public class LoadManager implements Writable{
|
||||
// So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's
|
||||
// state to COMMITTED.
|
||||
// this method should be removed at next upgrading.
|
||||
// only mini load job will be in LOADING state when persist, because mini load job is executed before writing
|
||||
// edit log.
|
||||
public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) {
|
||||
for (LoadJob job : idToLoadJob.values()) {
|
||||
if (job.getState() == JobState.LOADING) {
|
||||
// unfortunately, transaction id in load job is also not persisted, so we have to traverse
|
||||
// all transactions to find it.
|
||||
TransactionState txn = txnMgr.getCommittedTransactionStateByCallbackId(job.getCallbackId());
|
||||
TransactionState txn = txnMgr.getTransactionStateByCallbackIdAndStatus(job.getCallbackId(),
|
||||
Sets.newHashSet(TransactionStatus.COMMITTED));
|
||||
if (txn != null) {
|
||||
job.updateState(JobState.COMMITTED);
|
||||
LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED",
|
||||
job.getId(), txn.getTransactionId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING
|
||||
* state was replayed from image (not through the edit log), we forgot to add
|
||||
* the corresponding callback id in the CallbackFactory. As a result, the
|
||||
* subsequent finish txn edit logs cannot properly finish the job during the
|
||||
* replay process. This results in that when the FE restarts, these load jobs
|
||||
* that should have been completed are re-entered into the pending state,
|
||||
* resulting in repeated submission load tasks.
|
||||
*
|
||||
* Those wrong images are unrecoverable, so that we have to cancel all load jobs
|
||||
* in PENDING or LOADING state when restarting FE, to avoid submit jobs
|
||||
* repeatedly.
|
||||
*
|
||||
* This code can be remove when upgrading from 0.11.x to future version.
|
||||
*/
|
||||
if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) {
|
||||
JobState prevState = job.getState();
|
||||
TransactionState txn = txnMgr.getTransactionStateByCallbackId(job.getCallbackId());
|
||||
if (txn != null) {
|
||||
// the txn has already been committed or visible, change job's state to committed or finished
|
||||
if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
|
||||
job.updateState(JobState.COMMITTED);
|
||||
LOG.info("transfer load job {} state from {} to COMMITTED, because txn {} is COMMITTED",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else if (txn.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
job.updateState(JobState.FINISHED);
|
||||
LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) {
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL), false, false);
|
||||
LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else {
|
||||
// pending txn, do nothing
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// unfortunately, the txn may already been removed due to label expired, so we don't know the txn's
|
||||
// status. But we have to set the job as FINISHED, to void user load same data twice.
|
||||
job.updateState(JobState.UNKNOWN);
|
||||
job.failMsg = new FailMsg(CancelType.UNKNOWN, "transaction status is unknown");
|
||||
LOG.info("finish load job {} from {} to UNKNOWN, because transaction status is unknown. label: {}",
|
||||
job.getId(), prevState, job.getLabel());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1346,11 +1346,25 @@ public class GlobalTransactionMgr implements Writable {
|
||||
idGenerator.readFields(in);
|
||||
}
|
||||
|
||||
public TransactionState getCommittedTransactionStateByCallbackId(long callbackId) {
|
||||
public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set<TransactionStatus> status) {
|
||||
readLock();
|
||||
try {
|
||||
for (TransactionState txn : idToTransactionState.values()) {
|
||||
if (txn.getCallbackId() == callbackId && txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
|
||||
if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) {
|
||||
return txn;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public TransactionState getTransactionStateByCallbackId(long callbackId) {
|
||||
readLock();
|
||||
try {
|
||||
for (TransactionState txn : idToTransactionState.values()) {
|
||||
if (txn.getCallbackId() == callbackId) {
|
||||
return txn;
|
||||
}
|
||||
}
|
||||
|
||||
@ -255,7 +255,7 @@ abstract public class DorisHttpTestCase {
|
||||
httpServer.setup();
|
||||
httpServer.start();
|
||||
// must ensure the http server started before any unit test
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(5000);
|
||||
doSetUp();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user