From 4f39d405eeff769bb37a864cdf37501feeb1386a Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 5 Dec 2019 13:41:04 +0800 Subject: [PATCH] Fix some load bugs (#2384) For #2383 1. Limit the concurrent transactions of routine load job 2. Create new routine load task when txn is VISIBLE, not after COMMITTED. For #2267 1. All non-master daemon thread should also be started after catalog is ready. For #2354 1. `fixLoadJobMetaError()` should be called after all meta data is read, including image and edit logs. 2. Mini load job should set to CANCELLED when corresponding transaction is not found, instead of UNKNOWN. --- .../apache/doris/alter/SchemaChangeJob.java | 4 +- .../analysis/ShowRoutineLoadTaskStmt.java | 1 + .../org/apache/doris/catalog/Catalog.java | 33 +++++++- .../apache/doris/catalog/DomainResolver.java | 17 +--- .../apache/doris/catalog/TabletStatMgr.java | 18 +---- .../java/org/apache/doris/common/Config.java | 1 - .../apache/doris/external/EsStateStore.java | 36 +++++---- .../java/org/apache/doris/load/FailMsg.java | 3 +- .../org/apache/doris/load/loadv2/LoadJob.java | 18 +++-- .../apache/doris/load/loadv2/LoadManager.java | 62 +++++++++----- .../apache/doris/load/loadv2/MiniLoadJob.java | 10 +++ .../load/routineload/RoutineLoadJob.java | 81 +++++++++++++++---- .../load/routineload/RoutineLoadTaskInfo.java | 19 +++++ .../org/apache/doris/master/Checkpoint.java | 2 + .../apache/doris/planner/BrokerScanNode.java | 44 ++++------ .../transaction/GlobalTransactionMgr.java | 2 +- .../apache/doris/http/DorisHttpTestCase.java | 2 +- .../doris/mysql/privilege/AuthTest.java | 14 ++-- 18 files changed, 236 insertions(+), 131 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index 726bd153cc..03bd751daa 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -1044,7 +1044,9 @@ public class SchemaChangeJob extends AlterJob { db.writeLock(); try { OlapTable olapTable = (OlapTable) db.getTable(tableId); - olapTable.setState(OlapTableState.NORMAL); + if (olapTable != null) { + olapTable.setState(OlapTableState.NORMAL); + } } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java index e826debccc..d280704d9d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -42,6 +42,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt { new ImmutableList.Builder() .add("TaskId") .add("TxnId") + .add("TxnStatus") .add("JobId") .add("CreateTime") .add("ExecuteStartTime") diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index bfba9ab84b..dab508474b 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -272,7 +272,7 @@ public class Catalog { private BackupHandler backupHandler; private PublishVersionDaemon publishVersionDaemon; - private Daemon labelCleaner; // To clean old LabelInfo, ExportJobInfos + private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos private MasterDaemon txnCleaner; // To clean aborted or timeout txns private Daemon replayer; private Daemon timePrinter; @@ -1086,6 +1086,11 @@ public class Catalog { MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort); editLog.logMasterInfo(info); + // for master, the 'isReady' is set behind. + // but we are sure that all metadata is replayed if we get here. + // so no need to check 'isReady' flag in this method + fixBugAfterMetadataReplayed(false); + // start all daemon threads that only running on MASTER FE startMasterOnlyDaemonThreads(); // start other daemon threads that should running on all FE @@ -1101,6 +1106,24 @@ public class Catalog { LOG.info(msg); } + /* + * Add anything necessary here if there is meta data need to be fixed. + */ + public void fixBugAfterMetadataReplayed(boolean waitCatalogReady) { + if (waitCatalogReady) { + while (!isReady()) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + LOG.info("start to fix meta data bug"); + loadManager.fixLoadJobMetaBugs(globalTransactionMgr); + } + // start all daemon threads only running on Master private void startMasterOnlyDaemonThreads() { // start checkpoint thread @@ -1194,6 +1217,9 @@ public class Catalog { replayer.start(); } + // 'isReady' will be set to true in 'setCanRead()' method + fixBugAfterMetadataReplayed(true); + startNonMasterDaemonThreads(); MetricRepo.init(); @@ -1720,7 +1746,6 @@ public class Catalog { public long loadLoadJobsV2(DataInputStream in, long checksum) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) { loadManager.readFields(in); - loadManager.transferLoadingStateToCommitted(globalTransactionMgr); } return checksum; } @@ -2055,9 +2080,9 @@ public class Catalog { } public void createLabelCleaner() { - labelCleaner = new Daemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { + labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { @Override - protected void runOneCycle() { + protected void runAfterCatalogReady() { load.removeOldLoadJobs(); load.removeOldDeleteJobs(); loadManager.removeOldLoadJob(); diff --git a/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java b/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java index 6aaff5d3db..214be4d8df 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java +++ b/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java @@ -17,7 +17,7 @@ package org.apache.doris.catalog; -import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.mysql.privilege.PaloAuth; import com.google.common.base.Strings; @@ -35,30 +35,21 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -public class DomainResolver extends Daemon { +public class DomainResolver extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(DomainResolver.class); private static final String BNS_RESOLVER_TOOLS_PATH = "/usr/bin/get_instance_by_service"; private PaloAuth auth; - private AtomicBoolean isStart = new AtomicBoolean(false); - public DomainResolver(PaloAuth auth) { super("domain resolver", 10 * 1000); this.auth = auth; } + // 'public' for test @Override - public synchronized void start() { - if (isStart.compareAndSet(false, true)) { - super.start(); - } - } - - @Override - public void runOneCycle() { + public void runAfterCatalogReady() { // qualified user name -> domain name Map> userMap = Maps.newHashMap(); auth.getCopiedWhiteList(userMap); diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 3825c7a0c0..bf29eec9e7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; -import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; @@ -40,7 +40,7 @@ import java.util.Map; * TabletStatMgr is for collecting tablet(replica) statistics from backends. * Each FE will collect by itself. */ -public class TabletStatMgr extends Daemon { +public class TabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class); public TabletStatMgr() { @@ -48,19 +48,7 @@ public class TabletStatMgr extends Daemon { } @Override - protected void runOneCycle() { - // We should wait Frontend finished replaying logs, then begin to get tablet status - while (!Catalog.getInstance().isReady()) { - try { - // sleep here, not return. because if we return, we have to wait until next round, which may - // take a long time(default is tablet_stat_update_interval_second: 5 min) - Thread.sleep(10000); - } catch (InterruptedException e) { - LOG.info("get interrupted exception when sleep: ", e); - continue; - } - } - + protected void runAfterCatalogReady() { ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 6cb3c0f716..715da1fa23 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -934,6 +934,5 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean force_do_metadata_checkpoint = false; - } diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index 030136b9da..5cf3fdaff8 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -30,19 +30,16 @@ import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.Daemon; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.doris.common.util.MasterDaemon; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Range; -import okhttp3.Authenticator; -import okhttp3.Call; -import okhttp3.Credentials; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + import java.io.IOException; import java.util.Collections; import java.util.Comparator; @@ -50,21 +47,25 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; - -import org.json.JSONObject; +import okhttp3.Authenticator; +import okhttp3.Call; +import okhttp3.Credentials; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.Route; /** * it is used to call es api to get shard allocation state */ -public class EsStateStore extends Daemon { - +public class EsStateStore extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(EsStateStore.class); private Map esTables; public EsStateStore() { - super(Config.es_state_sync_interval_second * 1000); + super("es state store", Config.es_state_sync_interval_second * 1000); esTables = Maps.newConcurrentMap(); } @@ -80,8 +81,9 @@ public class EsStateStore extends Daemon { esTables.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - - protected void runOneCycle() { + + @Override + protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { EsRestClient client = new EsRestClient(esTable.getSeeds(), diff --git a/fe/src/main/java/org/apache/doris/load/FailMsg.java b/fe/src/main/java/org/apache/doris/load/FailMsg.java index bd80a45251..b60763d5c1 100644 --- a/fe/src/main/java/org/apache/doris/load/FailMsg.java +++ b/fe/src/main/java/org/apache/doris/load/FailMsg.java @@ -32,7 +32,8 @@ public class FailMsg implements Writable { ETL_QUALITY_UNSATISFIED, LOAD_RUN_FAIL, TIMEOUT, - UNKNOWN + UNKNOWN, + TXN_UNKNOWN // cancelled because txn status is unknown } private CancelType cancelType; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 9e3ef12850..184e4fcc1d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -39,6 +39,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PaloPrivilege; @@ -565,10 +566,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements * true: abort txn when cancel job, false: only change the state of job and ignore abort txn */ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("transaction_id", transactionId) - .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()) - .build()); + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", transactionId) + .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()).build()); // clean the loadingStatus loadingStatus.setState(TEtlState.CANCELLED); @@ -585,7 +584,12 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // set failMsg and state this.failMsg = failMsg; - finishTimestamp = System.currentTimeMillis(); + if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) { + // for bug fix, see LoadManager's fixLoadJobMetaBugs() method + finishTimestamp = createTimestamp; + } else { + finishTimestamp = System.currentTimeMillis(); + } // remove callback before abortTransaction(), so that the afterAborted() callback will not be called again Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); @@ -624,7 +628,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); state = JobState.FINISHED; - MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } } protected boolean checkDataQuality() { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d510d1070f..b4e542f210 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -33,6 +33,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; @@ -592,16 +593,20 @@ public class LoadManager implements Writable{ } } - // in previous implementation, there is a bug that when the job's corresponding transaction is - // COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED - // by timeout checker, which is not right. - // So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's - // state to COMMITTED. - // this method should be removed at next upgrading. - // only mini load job will be in LOADING state when persist, because mini load job is executed before writing - // edit log. - public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) { + // This method is only for bug fix. And should be call after image and edit log are replayed. + public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { + /* + * Bug 1: + * in previous implementation, there is a bug that when the job's corresponding transaction is + * COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED + * by timeout checker, which is not right. + * So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's + * state to COMMITTED. + * this method should be removed at next upgrading. + * only mini load job will be in LOADING state when persist, because mini load job is executed before writing + * edit log. + */ if (job.getState() == JobState.LOADING) { // unfortunately, transaction id in load job is also not persisted, so we have to traverse // all transactions to find it. @@ -609,13 +614,14 @@ public class LoadManager implements Writable{ Sets.newHashSet(TransactionStatus.COMMITTED)); if (txn != null) { job.updateState(JobState.COMMITTED); - LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED", - job.getId(), txn.getTransactionId()); + LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED." + + " label: {}, db: {}", job.getId(), txn.getTransactionId(), job.getLabel(), job.getDbId()); continue; } } /* + * Bug 2: * There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING * state was replayed from image (not through the edit log), we forgot to add * the corresponding callback id in the CallbackFactory. As a result, the @@ -644,7 +650,7 @@ public class LoadManager implements Writable{ LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE", job.getId(), prevState, txn.getTransactionId()); } else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) { - job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL), false, false); + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED", job.getId(), prevState, txn.getTransactionId()); } else { @@ -652,15 +658,33 @@ public class LoadManager implements Writable{ } continue; } - - // unfortunately, the txn may already been removed due to label expired, so we don't know the txn's - // status. But we have to set the job as FINISHED, to void user load same data twice. - job.updateState(JobState.UNKNOWN); - job.failMsg = new FailMsg(CancelType.UNKNOWN, "transaction status is unknown"); - LOG.info("finish load job {} from {} to UNKNOWN, because transaction status is unknown. label: {}", - job.getId(), prevState, job.getLabel()); + + if (job.getJobType() == EtlJobType.MINI) { + // for mini load job, just set it as CANCELLED, because mini load is a synchronous load. + // it would be failed if FE restart. + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); + LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" + + ". label: {}, db: {}", + job.getId(), prevState, job.getLabel(), job.getDbId()); + } else { + // txn is not found. here are 2 cases: + // 1. txn is not start yet, so we can just set job to CANCELLED, and user need to submit the job again. + // 2. because of the bug, txn is ABORTED of VISIBLE, and job is not finished. and txn is expired and + // be removed from transaction manager. So we don't know this job is finished or cancelled. + // in this case, the job should has been submitted long ago (otherwise the txn could not have been + // removed by expiration). + // Without affecting the first case of job, we set the job finish time to be the same as the create time. + // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); + + // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time + job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); + LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", + job.getId(), prevState, job.getLabel(), job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp())); + } } } + + removeOldLoadJob(); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 9be80210f0..77b4fc15d6 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -34,12 +34,16 @@ import org.apache.doris.transaction.TransactionState; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Set; public class MiniLoadJob extends LoadJob { + private static final Logger LOG = LogManager.getLogger(MiniLoadJob.class); private String tableName; @@ -102,6 +106,12 @@ public class MiniLoadJob extends LoadJob { private void updateLoadingStatue(TransactionState txnState) { MiniLoadTxnCommitAttachment miniLoadTxnCommitAttachment = (MiniLoadTxnCommitAttachment) txnState.getTxnCommitAttachment(); + if (miniLoadTxnCommitAttachment == null) { + // aborted txn may not has attachment + LOG.info("no miniLoadTxnCommitAttachment, txn id: {} status: {}", txnState.getTransactionId(), + txnState.getTransactionStatus()); + return; + } loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getFilteredRows())); loadingStatus.replaceCounter(DPP_NORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getLoadedRows())); if (miniLoadTxnCommitAttachment.getErrorLogUrl() != null) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 95aaf1abfd..fe50d50db4 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -39,6 +39,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; @@ -79,7 +80,6 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -308,15 +308,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl lock.writeLock().unlock(); } - protected boolean tryWriteLock(long timeout, TimeUnit unit) { - try { - return this.lock.writeLock().tryLock(timeout, unit); - } catch (InterruptedException e) { - LOG.warn("failed to try write lock at db[" + id + "]", e); - return false; - } - } - public String getName() { return name; } @@ -439,6 +430,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // the corresponding txn will be aborted by txn manager. // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. + // the already committed task will not be handled here. RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } @@ -698,7 +690,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); - executeCommitTask(routineLoadTaskInfo, txnState); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED); ++committedTaskNum; } } catch (Throwable e) { @@ -720,6 +712,56 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl LOG.debug("replay on committed: {}", txnState); } + /* + * the corresponding txn is visible, create a new task + */ + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + if (!txnOperated) { + String msg = String.format( + "should not happen, we find that txnOperated if false when handling afterVisble. job id: %d, txn id: %d", + id, txnState.getTransactionId()); + LOG.warn(msg); + // print a log and return. + // if this really happen, the job will be blocked, and this task can be seen by + // "show routine load task" stmt, which is in COMMITTED state for a long time. + // so we can find this error and step in. + return; + } + + writeLock(); + try { + if (state != JobState.RUNNING) { + // job is not running, nothing need to be done + return; + } + + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( + entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + if (routineLoadTaskInfo.getTxnStatus() != TransactionStatus.COMMITTED) { + // TODO(cmy): Normally, this should not happen. But for safe reason, just pause the job + String msg = String.format( + "should not happen, we find that task %s is not COMMITTED when handling afterVisble. job id: %d, txn id: %d, txn status: %s", + DebugUtil.printId(routineLoadTaskInfo.getId()), id, txnState.getTransactionId(), routineLoadTaskInfo.getTxnStatus().name()); + LOG.warn(msg); + try { + updateState(JobState.PAUSED, msg, false /* not replay */); + } catch (UserException e) { + // should not happen + LOG.warn("failed to pause the job {}. this should not happen", id, e); + } + return; + } + + // create new task + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + } finally { + writeUnlock(); + } + } + // the task is aborted when the correct number of rows is more then 0 // be will abort txn when all of kafka data is wrong or total consume data is 0 // txn will be aborted but progress will be update @@ -765,7 +807,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // TODO(ml): use previous be id depend on change reason } // step2: commit task , update progress, maybe create a new task - executeCommitTask(routineLoadTaskInfo, txnState); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED); } } catch (Exception e) { updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(), @@ -791,8 +833,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } // check task exists or not before call method - private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) - throws UserException { + private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState, + TransactionStatus txnStatus) throws UserException { // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); if (rlTaskTxnCommitAttachment == null) { @@ -814,9 +856,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } if (state == JobState.RUNNING) { - // step2: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + routineLoadTaskInfo.setTxnStatus(txnStatus); + if (txnStatus == TransactionStatus.ABORTED) { + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + } else if (txnStatus == TransactionStatus.COMMITTED) { + // this txn is just COMMITTED, create new task when the this txn is VISIBLE + // or if publish version task has some error, there will be lots of COMMITTED txns in GlobalTransactionMgr + } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index b7fb7a5395..3af9fb477e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -29,6 +29,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.collect.Lists; @@ -66,6 +67,10 @@ public abstract class RoutineLoadTaskInfo { protected long timeoutMs = -1; + // this status will be set when corresponding transaction's status is changed. + // so that user or other logic can know the status of the corresponding txn. + protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs) { this.id = id; this.jobId = jobId; @@ -127,7 +132,20 @@ public abstract class RoutineLoadTaskInfo { return timeoutMs; } + public void setTxnStatus(TransactionStatus txnStatus) { + this.txnStatus = txnStatus; + } + + public TransactionStatus getTxnStatus() { + return txnStatus; + } + public boolean isTimeout() { + if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { + // the corresponding txn is already finished, this task can not be treated as timeout. + return false; + } + if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) { LOG.debug("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id), executeStartTimeMs, timeoutMs); @@ -167,6 +185,7 @@ public abstract class RoutineLoadTaskInfo { List row = Lists.newArrayList(); row.add(DebugUtil.printId(id)); row.add(String.valueOf(txnId)); + row.add(txnStatus.name()); row.add(String.valueOf(jobId)); row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs))); row.add(String.valueOf(TimeUtils.longToTimeString(executeStartTimeMs))); diff --git a/fe/src/main/java/org/apache/doris/master/Checkpoint.java b/fe/src/main/java/org/apache/doris/master/Checkpoint.java index 77d9beb0c7..9c741ef8ac 100644 --- a/fe/src/main/java/org/apache/doris/master/Checkpoint.java +++ b/fe/src/main/java/org/apache/doris/master/Checkpoint.java @@ -104,6 +104,8 @@ public class Checkpoint extends MasterDaemon { checkPointVersion, catalog.getReplayedJournalId()); return; } + catalog.fixBugAfterMetadataReplayed(false); + catalog.saveImage(); replayedJournalId = catalog.getReplayedJournalId(); if (MetricRepo.isInit.get()) { diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index ed5da793ca..1e9f5a3e62 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -304,31 +304,20 @@ public class BrokerScanNode extends LoadScanNode { private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String brokerName) throws UserException { - List candidateBes = Lists.newArrayList(); - // Get backend - int numBe = Math.min(3, backends.size()); - for (int i = 0; i < numBe; ++i) { - candidateBes.add(backends.get(nextBe++)); - nextBe = nextBe % backends.size(); - } - // we shuffle it because if we only has 3 backends - // we will always choose the same backends without shuffle - Collections.shuffle(candidateBes); + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); // Generate on broker scan range TBrokerScanRange brokerScanRange = new TBrokerScanRange(); brokerScanRange.setParams(params); - int numBroker = Math.min(3, numBe); - for (int i = 0; i < numBroker; ++i) { - FsBroker broker = null; - try { - broker = Catalog.getInstance().getBrokerMgr().getBroker( - brokerName, candidateBes.get(i).getHost()); - } catch (AnalysisException e) { - throw new UserException(e.getMessage()); - } - brokerScanRange.addToBroker_addresses(new TNetworkAddress(broker.ip, broker.port)); + + FsBroker broker = null; + try { + broker = Catalog.getInstance().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost()); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); } + brokerScanRange.addToBroker_addresses(new TNetworkAddress(broker.ip, broker.port)); // Scan range TScanRange scanRange = new TScanRange(); @@ -337,12 +326,11 @@ public class BrokerScanNode extends LoadScanNode { // Locations TScanRangeLocations locations = new TScanRangeLocations(); locations.setScan_range(scanRange); - for (Backend be : candidateBes) { - TScanRangeLocation location = new TScanRangeLocation(); - location.setBackend_id(be.getId()); - location.setServer(new TNetworkAddress(be.getHost(), be.getBePort())); - locations.addToLocations(location); - } + + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackend_id(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); return locations; } @@ -401,12 +389,12 @@ public class BrokerScanNode extends LoadScanNode { private void assignBackends() throws UserException { backends = Lists.newArrayList(); for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAlive()) { + if (be.isAvailable()) { backends.add(be); } } if (backends.isEmpty()) { - throw new UserException("No Alive backends"); + throw new UserException("No available backends"); } Collections.shuffle(backends, random); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 6ec041ef87..cfb83797cb 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -216,7 +216,7 @@ public class GlobalTransactionMgr implements Writable { private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType sourceType) throws BeginTransactionException { switch (sourceType) { case ROUTINE_LOAD_TASK: - // we do not limit the txn num of routine load here. for 2 reasons: + // no need to check limit for routine load task: // 1. the number of running routine load tasks is limited by Config.max_routine_load_task_num_per_be // 2. if we add routine load txn to runningTxnNums, runningTxnNums will always be occupied by routine load, // and other txn may not be able to submitted. diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index e628939a5e..040c1f2d03 100644 --- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -110,7 +110,7 @@ abstract public class DorisHttpTestCase { public static final int HTTP_PORT; static { Random r = new Random(System.currentTimeMillis()); - HTTP_PORT = 30000 + r.nextInt(10000); + HTTP_PORT = 20000 + r.nextInt(10000); } protected static final String URI = "http://localhost:" + HTTP_PORT + "/api/" + DB_NAME + "/" + TABLE_NAME; diff --git a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java index 8b71af71c0..7007c9499d 100644 --- a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java +++ b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java @@ -247,7 +247,7 @@ public class AuthTest { } // 5.1 resolve domain [palo.domain1] - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); // 6. check if user from resolved ip can access to palo Assert.assertTrue(auth.checkPlainPassword(SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", "10.1.1.1", @@ -296,7 +296,7 @@ public class AuthTest { } // 8.1 resolve domain [palo.domain2] - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkPlainPassword(SystemInfoService.DEFAULT_CLUSTER + ":lisi", "20.1.1.1", "123456")); @@ -470,7 +470,7 @@ public class AuthTest { SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.ALTER)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", @@ -502,7 +502,7 @@ public class AuthTest { SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.SELECT)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", @@ -714,7 +714,7 @@ public class AuthTest { Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertFalse(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.DROP)); @@ -851,7 +851,7 @@ public class AuthTest { Assert.assertFalse(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); @@ -894,7 +894,7 @@ public class AuthTest { Assert.assertTrue(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertFalse(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP));