From 086bb82fd2ee2c4f574ed2379f192619fe045531 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 3 Dec 2019 16:02:50 +0800 Subject: [PATCH] Fixed a bug that Load job's state is incorrect when upgrading from 0.10.x to 0.11.x (#2356) There is bug in Doris version 0.10.x. When a load job in PENDING or LOADING state was replayed from image (not through the edit log), we forgot to add the corresponding callback id in the CallbackFactory. As a result, the subsequent finish txn edit logs cannot properly finish the job during the replay process. This results in that when the FE restarts, these load jobs that should have been completed are re-entered into the pending state, resulting in repeated submission load tasks. Those wrong images are unrecoverable, so that we have to reset all load jobs in PENDING or LOADING state when restarting FE, depends on its corresponding txn's status, to avoid submit jobs repeatedly. If corresponding txn exist, set load job' state depends on txn's status. If txn does not exist, may be the txn has been removed due to label expiration. So that we don't know the txn is aborted or visible. So we have to set the job's state as UNKNOWN, which need handle it manually. --- .../org/apache/doris/http/HttpServer.java | 2 +- .../java/org/apache/doris/load/LoadJob.java | 1 + .../apache/doris/load/loadv2/JobState.java | 1 + .../org/apache/doris/load/loadv2/LoadJob.java | 16 ++++-- .../apache/doris/load/loadv2/LoadManager.java | 54 ++++++++++++++++++- .../transaction/GlobalTransactionMgr.java | 18 ++++++- .../apache/doris/http/DorisHttpTestCase.java | 2 +- 7 files changed, 86 insertions(+), 8 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/http/HttpServer.java b/fe/src/main/java/org/apache/doris/http/HttpServer.java index cecdf6072f..b52c480bf4 100644 --- a/fe/src/main/java/org/apache/doris/http/HttpServer.java +++ b/fe/src/main/java/org/apache/doris/http/HttpServer.java @@ -203,7 +203,7 @@ public class HttpServer { .childHandler(new PaloHttpServerInitializer()); Channel ch = serverBootstrap.bind(port).sync().channel(); ch.closeFuture().sync(); - + LOG.info("HttpServer started with port: {}", port); } catch (Exception e) { LOG.error("Fail to start FE query http server[port: " + port + "] ", e); System.exit(-1); diff --git a/fe/src/main/java/org/apache/doris/load/LoadJob.java b/fe/src/main/java/org/apache/doris/load/LoadJob.java index d8c288f7f2..9516b10558 100644 --- a/fe/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/LoadJob.java @@ -63,6 +63,7 @@ public class LoadJob implements Writable { // QUORUM_FINISHED state is internal // To user, it should be transformed to FINISHED public enum JobState { + UNKNOWN, // only for show load state value check, details, see LoadJobV2's JobState PENDING, ETL, LOADING, diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java index 777992157a..7f1d78821a 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -19,6 +19,7 @@ package org.apache.doris.load.loadv2; // JobState will be persisted in meta data by name, so the order of these state is not important public enum JobState { + UNKNOWN, // this is only for ISSUE #2354 PENDING, // init state LOADING, // job is running COMMITTED, // transaction is committed but not visible diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index fa42fcf332..9e3ef12850 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -293,9 +293,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return state == JobState.COMMITTED || state == JobState.FINISHED || state == JobState.CANCELLED; } - // return true if job is done(FINISHED/CANCELLED) + // return true if job is done(FINISHED/CANCELLED/UNKNOWN) public boolean isCompleted() { - return state == JobState.FINISHED || state == JobState.CANCELLED; + return state == JobState.FINISHED || state == JobState.CANCELLED || state == JobState.UNKNOWN; } protected void setJobProperties(Map properties) throws DdlException { @@ -430,6 +430,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected void unprotectedUpdateState(JobState jobState) { switch (jobState) { + case UNKNOWN: + executeUnknown(); + break; case LOADING: executeLoad(); break; @@ -444,6 +447,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements } } + private void executeUnknown() { + // set finished timestamp to create timestamp, so that this unknown job + // can be remove due to label expiration so soon as possible + finishTimestamp = createTimestamp; + state = JobState.UNKNOWN; + } + private void executeLoad() { loadStartTimestamp = System.currentTimeMillis(); state = JobState.LOADING; @@ -557,7 +567,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) .add("transaction_id", transactionId) - .add("error_msg", "Failed to execute load with error " + failMsg.getMsg()) + .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()) .build()); // clean the loadingStatus diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 2edfecf09d..d510d1070f 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -35,6 +35,7 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TMiniLoadBeginRequest; @@ -42,6 +43,7 @@ import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -596,18 +598,68 @@ public class LoadManager implements Writable{ // So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's // state to COMMITTED. // this method should be removed at next upgrading. + // only mini load job will be in LOADING state when persist, because mini load job is executed before writing + // edit log. public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { if (job.getState() == JobState.LOADING) { // unfortunately, transaction id in load job is also not persisted, so we have to traverse // all transactions to find it. - TransactionState txn = txnMgr.getCommittedTransactionStateByCallbackId(job.getCallbackId()); + TransactionState txn = txnMgr.getTransactionStateByCallbackIdAndStatus(job.getCallbackId(), + Sets.newHashSet(TransactionStatus.COMMITTED)); if (txn != null) { job.updateState(JobState.COMMITTED); LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED", job.getId(), txn.getTransactionId()); + continue; } } + + /* + * There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING + * state was replayed from image (not through the edit log), we forgot to add + * the corresponding callback id in the CallbackFactory. As a result, the + * subsequent finish txn edit logs cannot properly finish the job during the + * replay process. This results in that when the FE restarts, these load jobs + * that should have been completed are re-entered into the pending state, + * resulting in repeated submission load tasks. + * + * Those wrong images are unrecoverable, so that we have to cancel all load jobs + * in PENDING or LOADING state when restarting FE, to avoid submit jobs + * repeatedly. + * + * This code can be remove when upgrading from 0.11.x to future version. + */ + if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { + JobState prevState = job.getState(); + TransactionState txn = txnMgr.getTransactionStateByCallbackId(job.getCallbackId()); + if (txn != null) { + // the txn has already been committed or visible, change job's state to committed or finished + if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) { + job.updateState(JobState.COMMITTED); + LOG.info("transfer load job {} state from {} to COMMITTED, because txn {} is COMMITTED", + job.getId(), prevState, txn.getTransactionId()); + } else if (txn.getTransactionStatus() == TransactionStatus.VISIBLE) { + job.updateState(JobState.FINISHED); + LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE", + job.getId(), prevState, txn.getTransactionId()); + } else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) { + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL), false, false); + LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED", + job.getId(), prevState, txn.getTransactionId()); + } else { + // pending txn, do nothing + } + continue; + } + + // unfortunately, the txn may already been removed due to label expired, so we don't know the txn's + // status. But we have to set the job as FINISHED, to void user load same data twice. + job.updateState(JobState.UNKNOWN); + job.failMsg = new FailMsg(CancelType.UNKNOWN, "transaction status is unknown"); + LOG.info("finish load job {} from {} to UNKNOWN, because transaction status is unknown. label: {}", + job.getId(), prevState, job.getLabel()); + } } } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 3b537a2749..6ec041ef87 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -1346,11 +1346,25 @@ public class GlobalTransactionMgr implements Writable { idGenerator.readFields(in); } - public TransactionState getCommittedTransactionStateByCallbackId(long callbackId) { + public TransactionState getTransactionStateByCallbackIdAndStatus(long callbackId, Set status) { readLock(); try { for (TransactionState txn : idToTransactionState.values()) { - if (txn.getCallbackId() == callbackId && txn.getTransactionStatus() == TransactionStatus.COMMITTED) { + if (txn.getCallbackId() == callbackId && status.contains(txn.getTransactionStatus())) { + return txn; + } + } + } finally { + readUnlock(); + } + return null; + } + + public TransactionState getTransactionStateByCallbackId(long callbackId) { + readLock(); + try { + for (TransactionState txn : idToTransactionState.values()) { + if (txn.getCallbackId() == callbackId) { return txn; } } diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index 392005f6e2..57040cdc5f 100644 --- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -255,7 +255,7 @@ abstract public class DorisHttpTestCase { httpServer.setup(); httpServer.start(); // must ensure the http server started before any unit test - Thread.sleep(500); + Thread.sleep(5000); doSetUp(); }