diff --git a/fe/src/main/java/org/apache/doris/http/HttpServer.java b/fe/src/main/java/org/apache/doris/http/HttpServer.java index cecdf6072f..b52c480bf4 100644 --- a/fe/src/main/java/org/apache/doris/http/HttpServer.java +++ b/fe/src/main/java/org/apache/doris/http/HttpServer.java @@ -203,7 +203,7 @@ public class HttpServer { .childHandler(new PaloHttpServerInitializer()); Channel ch = serverBootstrap.bind(port).sync().channel(); ch.closeFuture().sync(); - + LOG.info("HttpServer started with port: {}", port); } catch (Exception e) { LOG.error("Fail to start FE query http server[port: " + port + "] ", e); System.exit(-1); diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java index d8c288f7f2..9516b10558 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java @@ -63,6 +63,7 @@ public class LoadJob implements Writable { // QUORUM_FINISHED state is internal // To user, it should be transformed to FINISHED public enum JobState { + UNKNOWN, // only for show load state value check, details, see LoadJobV2's JobState PENDING, ETL, LOADING, diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java index 777992157a..7f1d78821a 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2; // JobState will be persisted in meta data by name, so the order of these state is not important public enum JobState { + UNKNOWN, // this is only for ISSUE #2354 PENDING, // init state LOADING, // job is running COMMITTED, // transaction is committed but not visible diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index fa42fcf332..9e3ef12850 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -293,9 +293,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return state == JobState.COMMITTED || state == JobState.FINISHED || state == JobState.CANCELLED; } - // return true if job is done(FINISHED/CANCELLED) + // return true if job is done(FINISHED/CANCELLED/UNKNOWN) public boolean isCompleted() { - return state == JobState.FINISHED || state == JobState.CANCELLED; + return state == JobState.FINISHED || state == JobState.CANCELLED || state == JobState.UNKNOWN; } protected void setJobProperties(Map properties) throws DdlException { @@ -430,6 +430,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected void unprotectedUpdateState(JobState jobState) { switch (jobState) { + case UNKNOWN: + executeUnknown(); + break; case LOADING: executeLoad(); break; @@ -444,6 +447,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } + private void executeUnknown() { + // set finished timestamp to create timestamp, so that this unknown job + // can be remove due to label expiration so soon as possible + finishTimestamp = createTimestamp; + state = JobState.UNKNOWN; + } + private void executeLoad() { loadStartTimestamp = System.currentTimeMillis(); state = JobState.LOADING; @@ -557,7 +567,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("transaction_id", transactionId) - .add("error_msg", "Failed to execute load with error " + failMsg.getMsg()) + .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()) .build()); // clean the loadingStatus diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 2edfecf09d..d510d1070f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -35,6 +35,7 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; @@ -42,6 +43,7 @@ import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -596,18 +598,68 @@ public class LoadManager implements Writable{ // So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's // state to COMMITTED. // this method should be removed at next upgrading. + // only mini load job will be in LOADING state when persist, because mini load job is executed before writing + // edit log. public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { if (job.getState() == JobState.LOADING) { // unfortunately, transaction id in load job is also not persisted, so we have to traverse // all transactions to find it. - TransactionState txn = txnMgr.getCommittedTransactionStateByCallbackId(job.getCallbackId()); + TransactionState txn = txnMgr.getTransactionStateByCallbackIdAndStatus(job.getCallbackId(), + Sets.newHashSet(TransactionStatus.COMMITTED)); if (txn != null) { job.updateState(JobState.COMMITTED); LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED", job.getId(), txn.getTransactionId()); + continue; } } + + /* + * There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING + * state was replayed from image (not through the edit log), we forgot to add + * the corresponding callback id in the CallbackFactory. As a result, the + * subsequent finish txn edit logs cannot properly finish the job during the + * replay process. This results in that when the FE restarts, these load jobs + * that should have been completed are re-entered into the pending state, + * resulting in repeated submission load tasks. + * + * Those wrong images are unrecoverable, so that we have to cancel all load jobs + * in PENDING or LOADING state when restarting FE, to avoid submit jobs + * repeatedly. + * + * This code can be remove when upgrading from 0.11.x to future version. + */ + if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { + JobState prevState = job.getState(); + TransactionState txn = txnMgr.getTransactionStateByCallbackId(job.getCallbackId()); + if (txn != null) { + // the txn has already been committed or visible, change job's state to committed or finished + if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) { + job.updateState(JobState.COMMITTED); + LOG.info("transfer load job {} state from {} to COMMITTED, because txn {} is COMMITTED", + job.getId(), prevState, txn.getTransactionId()); + } else if (txn.getTransactionStatus() == TransactionStatus.VISIBLE) { + job.updateState(JobState.FINISHED); + LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE", + job.getId(), prevState, txn.getTransactionId()); + } else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) { + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL), false, false); + LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED", + job.getId(), prevState, txn.getTransactionId()); + } else { + // pending txn, do nothing + } + continue; + } + + // unfortunately, the txn may already been removed due to label expired, so we don't know the txn's + // status. But we have to set the job as FINISHED, to void user load same data twice. + job.updateState(JobState.UNKNOWN); + job.failMsg = new FailMsg(CancelType.UNKNOWN, "transaction status is unknown"); + LOG.info("finish load job {} from {} to UNKNOWN, because transaction status is unknown. label: {}", + job.getId(), prevState, job.getLabel()); + } } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 3b537a2749..6ec041ef87 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -1346,11 +1346,25 @@ public class GlobalTransactionMgr implements Writable { idGenerator.readFields(in); } - public TransactionState getCommittedTransactionStateByCallbackId(long callbackId) { + public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set status) { readLock(); try { for (TransactionState txn : idToTransactionState.values()) { - if (txn.getCallbackId() == callbackId && txn.getTransactionStatus() == TransactionStatus.COMMITTED) { + if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) { + return txn; + } + } + } finally { + readUnlock(); + } + return null; + } + + public TransactionState getTransactionStateByCallbackId(long callbackId) { + readLock(); + try { + for (TransactionState txn : idToTransactionState.values()) { + if (txn.getCallbackId() == callbackId) { return txn; } } diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index 392005f6e2..57040cdc5f 100644 --- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -255,7 +255,7 @@ abstract public class DorisHttpTestCase { httpServer.setup(); httpServer.start(); // must ensure the http server started before any unit test - Thread.sleep(500); + Thread.sleep(5000); doSetUp(); }