[fix](mini-load) Remove mini load in LOADING and PENDING state (#8649)
1. Remove some unused code.
2. handle mini load with wrong state
1. For some historical reasons, some mini load jobs in LOADING state have not been cleared.
As a result, new load jobs cannot be committed.
2. If a mini load job is created right before FE restart, the mini load job will be in PENDING state forever.
But it should be removed finally.
This commit is contained in:
@ -36,7 +36,6 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
import org.apache.doris.load.FailMsg;
|
||||
import org.apache.doris.load.FailMsg.CancelType;
|
||||
@ -45,9 +44,7 @@ import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TMiniLoadBeginRequest;
|
||||
import org.apache.doris.thrift.TMiniLoadRequest;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -359,32 +356,6 @@ public class LoadManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
|
||||
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
|
||||
|
||||
LoadJob loadJob = null;
|
||||
readLock();
|
||||
try {
|
||||
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
|
||||
if (labelToLoadJobs == null) {
|
||||
throw new DdlException("Load job does not exist");
|
||||
}
|
||||
List<LoadJob> loadJobList = labelToLoadJobs.get(stmt.getLabel());
|
||||
if (loadJobList == null) {
|
||||
throw new DdlException("Load job does not exist");
|
||||
}
|
||||
Optional<LoadJob> loadJobOptional = loadJobList.stream().filter(entity -> !entity.isTxnDone()).findFirst();
|
||||
if (!loadJobOptional.isPresent()) {
|
||||
throw new DdlException("There is no uncompleted job which label is " + stmt.getLabel());
|
||||
}
|
||||
loadJob = loadJobOptional.get();
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
|
||||
}
|
||||
|
||||
public void replayEndLoadJob(LoadJobFinalOperation operation) {
|
||||
LoadJob job = idToLoadJob.get(operation.getId());
|
||||
if (job == null) {
|
||||
@ -683,102 +654,6 @@ public class LoadManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
// Deprecated in version 0.12
|
||||
// This method is only for bug fix. And should be call after image and edit log are replayed.
|
||||
public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
|
||||
for (LoadJob job : idToLoadJob.values()) {
|
||||
/*
|
||||
* Bug 1:
|
||||
* in previous implementation, there is a bug that when the job's corresponding transaction is
|
||||
* COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED
|
||||
* by timeout checker, which is not right.
|
||||
* So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's
|
||||
* state to COMMITTED.
|
||||
* this method should be removed at next upgrading.
|
||||
* only mini load job will be in LOADING state when persist, because mini load job is executed before writing
|
||||
* edit log.
|
||||
*/
|
||||
if (job.getState() == JobState.LOADING) {
|
||||
// unfortunately, transaction id in load job is also not persisted, so we have to traverse
|
||||
// all transactions to find it.
|
||||
TransactionState txn = txnMgr.getTransactionStateByCallbackIdAndStatus(job.getDbId(), job.getCallbackId(),
|
||||
Sets.newHashSet(TransactionStatus.COMMITTED));
|
||||
if (txn != null) {
|
||||
job.updateState(JobState.COMMITTED);
|
||||
LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED."
|
||||
+ " label: {}, db: {}", job.getId(), txn.getTransactionId(), job.getLabel(), job.getDbId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Bug 2:
|
||||
* There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING
|
||||
* state was replayed from image (not through the edit log), we forgot to add
|
||||
* the corresponding callback id in the CallbackFactory. As a result, the
|
||||
* subsequent finish txn edit logs cannot properly finish the job during the
|
||||
* replay process. This results in that when the FE restarts, these load jobs
|
||||
* that should have been completed are re-entered into the pending state,
|
||||
* resulting in repeated submission load tasks.
|
||||
*
|
||||
* Those wrong images are unrecoverable, so that we have to cancel all load jobs
|
||||
* in PENDING or LOADING state when restarting FE, to avoid submit jobs
|
||||
* repeatedly.
|
||||
*
|
||||
* This code can be remove when upgrading from 0.11.x to future version.
|
||||
*/
|
||||
if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) {
|
||||
JobState prevState = job.getState();
|
||||
TransactionState txn = txnMgr.getTransactionStateByCallbackId(job.getDbId(), job.getCallbackId());
|
||||
if (txn != null) {
|
||||
// the txn has already been committed or visible, change job's state to committed or finished
|
||||
if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
|
||||
job.updateState(JobState.COMMITTED);
|
||||
LOG.info("transfer load job {} state from {} to COMMITTED, because txn {} is COMMITTED",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else if (txn.getTransactionStatus() == TransactionStatus.VISIBLE) {
|
||||
job.updateState(JobState.FINISHED);
|
||||
LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) {
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
|
||||
LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED",
|
||||
job.getId(), prevState, txn.getTransactionId());
|
||||
} else {
|
||||
// pending txn, do nothing
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (job.getJobType() == EtlJobType.MINI) {
|
||||
// for mini load job, just set it as CANCELLED, because mini load is a synchronous load.
|
||||
// it would be failed if FE restart.
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
|
||||
LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown"
|
||||
+ ". label: {}, db: {}",
|
||||
job.getId(), prevState, job.getLabel(), job.getDbId());
|
||||
} else {
|
||||
// txn is not found. here are 2 cases:
|
||||
// 1. txn is not start yet, so we can just set job to CANCELLED, and user need to submit the job again.
|
||||
// 2. because of the bug, txn is ABORTED of VISIBLE, and job is not finished. and txn is expired and
|
||||
// be removed from transaction manager. So we don't know this job is finished or cancelled.
|
||||
// in this case, the job should has been submitted long ago (otherwise the txn could not have been
|
||||
// removed by expiration).
|
||||
// Without affecting the first case of job, we set the job finish time to be the same as the create time.
|
||||
// In this way, the second type of job will be automatically cleared after running removeOldLoadJob();
|
||||
|
||||
// use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false);
|
||||
LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}",
|
||||
job.getId(), prevState, job.getLabel(), job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
removeOldLoadJob();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
long currentTimeMs = System.currentTimeMillis();
|
||||
@ -798,6 +673,27 @@ public class LoadManager implements Writable {
|
||||
if (loadJob.isExpired(currentTimeMs)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (loadJob.getJobType() == EtlJobType.MINI) {
|
||||
// This is a bug fix. the mini load job should not with state LOADING.
|
||||
if (loadJob.getState() == JobState.LOADING) {
|
||||
LOG.warn("skip mini load job {} in db {} with LOADING state", loadJob.getId(), loadJob.getDbId());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (loadJob.getState() == JobState.PENDING) {
|
||||
// bad case. When a mini load job is created and then FE restart.
|
||||
// the job will be in PENDING state forever.
|
||||
// This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1
|
||||
TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
|
||||
loadJob.getDbId(), loadJob.getTransactionId());
|
||||
if (state == null) {
|
||||
LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}",
|
||||
loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
idToLoadJob.put(loadJob.getId(), loadJob);
|
||||
Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
|
||||
if (map == null) {
|
||||
|
||||
Reference in New Issue
Block a user