diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java index 726bd153cc..03bd751daa 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeJob.java @@ -1044,7 +1044,9 @@ public class SchemaChangeJob extends AlterJob { db.writeLock(); try { OlapTable olapTable = (OlapTable) db.getTable(tableId); - olapTable.setState(OlapTableState.NORMAL); + if (olapTable != null) { + olapTable.setState(OlapTableState.NORMAL); + } } finally { db.writeUnlock(); } diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java index e826debccc..d280704d9d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -42,6 +42,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt { new ImmutableList.Builder() .add("TaskId") .add("TxnId") + .add("TxnStatus") .add("JobId") .add("CreateTime") .add("ExecuteStartTime") diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index bfba9ab84b..dab508474b 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -272,7 +272,7 @@ public class Catalog { private BackupHandler backupHandler; private PublishVersionDaemon publishVersionDaemon; - private Daemon labelCleaner; // To clean old LabelInfo, ExportJobInfos + private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos private MasterDaemon txnCleaner; // To clean aborted or timeout txns private Daemon replayer; private Daemon timePrinter; @@ -1086,6 +1086,11 @@ public class Catalog { MasterInfo info = new MasterInfo(this.masterIp, this.masterHttpPort, this.masterRpcPort); editLog.logMasterInfo(info); + // for master, the 'isReady' is set behind. + // but we are sure that all metadata is replayed if we get here. + // so no need to check 'isReady' flag in this method + fixBugAfterMetadataReplayed(false); + // start all daemon threads that only running on MASTER FE startMasterOnlyDaemonThreads(); // start other daemon threads that should running on all FE @@ -1101,6 +1106,24 @@ public class Catalog { LOG.info(msg); } + /* + * Add anything necessary here if there is meta data need to be fixed. + */ + public void fixBugAfterMetadataReplayed(boolean waitCatalogReady) { + if (waitCatalogReady) { + while (!isReady()) { + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + LOG.info("start to fix meta data bug"); + loadManager.fixLoadJobMetaBugs(globalTransactionMgr); + } + // start all daemon threads only running on Master private void startMasterOnlyDaemonThreads() { // start checkpoint thread @@ -1194,6 +1217,9 @@ public class Catalog { replayer.start(); } + // 'isReady' will be set to true in 'setCanRead()' method + fixBugAfterMetadataReplayed(true); + startNonMasterDaemonThreads(); MetricRepo.init(); @@ -1720,7 +1746,6 @@ public class Catalog { public long loadLoadJobsV2(DataInputStream in, long checksum) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_50) { loadManager.readFields(in); - loadManager.transferLoadingStateToCommitted(globalTransactionMgr); } return checksum; } @@ -2055,9 +2080,9 @@ public class Catalog { } public void createLabelCleaner() { - labelCleaner = new Daemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { + labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) { @Override - protected void runOneCycle() { + protected void runAfterCatalogReady() { load.removeOldLoadJobs(); load.removeOldDeleteJobs(); loadManager.removeOldLoadJob(); diff --git a/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java b/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java index 6aaff5d3db..214be4d8df 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java +++ b/fe/src/main/java/org/apache/doris/catalog/DomainResolver.java @@ -17,7 +17,7 @@ package org.apache.doris.catalog; -import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.mysql.privilege.PaloAuth; import com.google.common.base.Strings; @@ -35,30 +35,21 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -public class DomainResolver extends Daemon { +public class DomainResolver extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(DomainResolver.class); private static final String BNS_RESOLVER_TOOLS_PATH = "/usr/bin/get_instance_by_service"; private PaloAuth auth; - private AtomicBoolean isStart = new AtomicBoolean(false); - public DomainResolver(PaloAuth auth) { super("domain resolver", 10 * 1000); this.auth = auth; } + // 'public' for test @Override - public synchronized void start() { - if (isStart.compareAndSet(false, true)) { - super.start(); - } - } - - @Override - public void runOneCycle() { + public void runAfterCatalogReady() { // qualified user name -> domain name Map> userMap = Maps.newHashMap(); auth.getCopiedWhiteList(userMap); diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index 3825c7a0c0..bf29eec9e7 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; -import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.system.Backend; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TNetworkAddress; @@ -40,7 +40,7 @@ import java.util.Map; * TabletStatMgr is for collecting tablet(replica) statistics from backends. * Each FE will collect by itself. */ -public class TabletStatMgr extends Daemon { +public class TabletStatMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(TabletStatMgr.class); public TabletStatMgr() { @@ -48,19 +48,7 @@ public class TabletStatMgr extends Daemon { } @Override - protected void runOneCycle() { - // We should wait Frontend finished replaying logs, then begin to get tablet status - while (!Catalog.getInstance().isReady()) { - try { - // sleep here, not return. because if we return, we have to wait until next round, which may - // take a long time(default is tablet_stat_update_interval_second: 5 min) - Thread.sleep(10000); - } catch (InterruptedException e) { - LOG.info("get interrupted exception when sleep: ", e); - continue; - } - } - + protected void runAfterCatalogReady() { ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 6cb3c0f716..715da1fa23 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -934,6 +934,5 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean force_do_metadata_checkpoint = false; - } diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index 030136b9da..5cf3fdaff8 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -30,19 +30,16 @@ import org.apache.doris.catalog.Table.TableType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.util.Daemon; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.doris.common.util.MasterDaemon; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Range; -import okhttp3.Authenticator; -import okhttp3.Call; -import okhttp3.Credentials; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.Route; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; + import java.io.IOException; import java.util.Collections; import java.util.Comparator; @@ -50,21 +47,25 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; - -import org.json.JSONObject; +import okhttp3.Authenticator; +import okhttp3.Call; +import okhttp3.Credentials; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.Route; /** * it is used to call es api to get shard allocation state */ -public class EsStateStore extends Daemon { - +public class EsStateStore extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(EsStateStore.class); private Map esTables; public EsStateStore() { - super(Config.es_state_sync_interval_second * 1000); + super("es state store", Config.es_state_sync_interval_second * 1000); esTables = Maps.newConcurrentMap(); } @@ -80,8 +81,9 @@ public class EsStateStore extends Daemon { esTables.remove(tableId); LOG.info("deregister table [{}] from sync list", tableId); } - - protected void runOneCycle() { + + @Override + protected void runAfterCatalogReady() { for (EsTable esTable : esTables.values()) { try { EsRestClient client = new EsRestClient(esTable.getSeeds(), diff --git a/fe/src/main/java/org/apache/doris/load/FailMsg.java b/fe/src/main/java/org/apache/doris/load/FailMsg.java index bd80a45251..b60763d5c1 100644 --- a/fe/src/main/java/org/apache/doris/load/FailMsg.java +++ b/fe/src/main/java/org/apache/doris/load/FailMsg.java @@ -32,7 +32,8 @@ public class FailMsg implements Writable { ETL_QUALITY_UNSATISFIED, LOAD_RUN_FAIL, TIMEOUT, - UNKNOWN + UNKNOWN, + TXN_UNKNOWN // cancelled because txn status is unknown } private CancelType cancelType; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 9e3ef12850..184e4fcc1d 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -39,6 +39,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; +import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PaloPrivilege; @@ -565,10 +566,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements * true: abort txn when cancel job, false: only change the state of job and ignore abort txn */ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("transaction_id", transactionId) - .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()) - .build()); + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", transactionId) + .add("error_msg", "Failed to execute load with error: " + failMsg.getMsg()).build()); // clean the loadingStatus loadingStatus.setState(TEtlState.CANCELLED); @@ -585,7 +584,12 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // set failMsg and state this.failMsg = failMsg; - finishTimestamp = System.currentTimeMillis(); + if (failMsg.getCancelType() == CancelType.TXN_UNKNOWN) { + // for bug fix, see LoadManager's fixLoadJobMetaBugs() method + finishTimestamp = createTimestamp; + } else { + finishTimestamp = System.currentTimeMillis(); + } // remove callback before abortTransaction(), so that the afterAborted() callback will not be called again Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); @@ -624,7 +628,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); state = JobState.FINISHED; - MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + if (MetricRepo.isInit.get()) { + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } } protected boolean checkDataQuality() { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d510d1070f..b4e542f210 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -33,6 +33,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; @@ -592,16 +593,20 @@ public class LoadManager implements Writable{ } } - // in previous implementation, there is a bug that when the job's corresponding transaction is - // COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED - // by timeout checker, which is not right. - // So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's - // state to COMMITTED. - // this method should be removed at next upgrading. - // only mini load job will be in LOADING state when persist, because mini load job is executed before writing - // edit log. - public void transferLoadingStateToCommitted(GlobalTransactionMgr txnMgr) { + // This method is only for bug fix. And should be call after image and edit log are replayed. + public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { + /* + * Bug 1: + * in previous implementation, there is a bug that when the job's corresponding transaction is + * COMMITTED but not VISIBLE, the load job's state is LOADING, so that the job may be CANCELLED + * by timeout checker, which is not right. + * So here we will check each LOADING load jobs' txn status, if it is COMMITTED, change load job's + * state to COMMITTED. + * this method should be removed at next upgrading. + * only mini load job will be in LOADING state when persist, because mini load job is executed before writing + * edit log. + */ if (job.getState() == JobState.LOADING) { // unfortunately, transaction id in load job is also not persisted, so we have to traverse // all transactions to find it. @@ -609,13 +614,14 @@ public class LoadManager implements Writable{ Sets.newHashSet(TransactionStatus.COMMITTED)); if (txn != null) { job.updateState(JobState.COMMITTED); - LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED", - job.getId(), txn.getTransactionId()); + LOG.info("transfer load job {} state from LOADING to COMMITTED, because txn {} is COMMITTED." + + " label: {}, db: {}", job.getId(), txn.getTransactionId(), job.getLabel(), job.getDbId()); continue; } } /* + * Bug 2: * There is bug in Doris version 0.10.15. When a load job in PENDING or LOADING * state was replayed from image (not through the edit log), we forgot to add * the corresponding callback id in the CallbackFactory. As a result, the @@ -644,7 +650,7 @@ public class LoadManager implements Writable{ LOG.info("transfer load job {} state from {} to FINISHED, because txn {} is VISIBLE", job.getId(), prevState, txn.getTransactionId()); } else if (txn.getTransactionStatus() == TransactionStatus.ABORTED) { - job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL), false, false); + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); LOG.info("transfer load job {} state from {} to CANCELLED, because txn {} is ABORTED", job.getId(), prevState, txn.getTransactionId()); } else { @@ -652,15 +658,33 @@ public class LoadManager implements Writable{ } continue; } - - // unfortunately, the txn may already been removed due to label expired, so we don't know the txn's - // status. But we have to set the job as FINISHED, to void user load same data twice. - job.updateState(JobState.UNKNOWN); - job.failMsg = new FailMsg(CancelType.UNKNOWN, "transaction status is unknown"); - LOG.info("finish load job {} from {} to UNKNOWN, because transaction status is unknown. label: {}", - job.getId(), prevState, job.getLabel()); + + if (job.getJobType() == EtlJobType.MINI) { + // for mini load job, just set it as CANCELLED, because mini load is a synchronous load. + // it would be failed if FE restart. + job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); + LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" + + ". label: {}, db: {}", + job.getId(), prevState, job.getLabel(), job.getDbId()); + } else { + // txn is not found. here are 2 cases: + // 1. txn is not start yet, so we can just set job to CANCELLED, and user need to submit the job again. + // 2. because of the bug, txn is ABORTED of VISIBLE, and job is not finished. and txn is expired and + // be removed from transaction manager. So we don't know this job is finished or cancelled. + // in this case, the job should has been submitted long ago (otherwise the txn could not have been + // removed by expiration). + // Without affecting the first case of job, we set the job finish time to be the same as the create time. + // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); + + // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time + job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); + LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", + job.getId(), prevState, job.getLabel(), job.getDbId(), TimeUtils.longToTimeString(job.getCreateTimestamp())); + } } } + + removeOldLoadJob(); } @Override diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 9be80210f0..77b4fc15d6 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -34,12 +34,16 @@ import org.apache.doris.transaction.TransactionState; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Set; public class MiniLoadJob extends LoadJob { + private static final Logger LOG = LogManager.getLogger(MiniLoadJob.class); private String tableName; @@ -102,6 +106,12 @@ public class MiniLoadJob extends LoadJob { private void updateLoadingStatue(TransactionState txnState) { MiniLoadTxnCommitAttachment miniLoadTxnCommitAttachment = (MiniLoadTxnCommitAttachment) txnState.getTxnCommitAttachment(); + if (miniLoadTxnCommitAttachment == null) { + // aborted txn may not has attachment + LOG.info("no miniLoadTxnCommitAttachment, txn id: {} status: {}", txnState.getTransactionId(), + txnState.getTransactionStatus()); + return; + } loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getFilteredRows())); loadingStatus.replaceCounter(DPP_NORMAL_ALL, String.valueOf(miniLoadTxnCommitAttachment.getLoadedRows())); if (miniLoadTxnCommitAttachment.getErrorLogUrl() != null) { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 95aaf1abfd..fe50d50db4 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -39,6 +39,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.TimeUtils; @@ -79,7 +80,6 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -308,15 +308,6 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl lock.writeLock().unlock(); } - protected boolean tryWriteLock(long timeout, TimeUnit unit) { - try { - return this.lock.writeLock().tryLock(timeout, unit); - } catch (InterruptedException e) { - LOG.warn("failed to try write lock at db[" + id + "]", e); - return false; - } - } - public String getName() { return name; } @@ -439,6 +430,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // the corresponding txn will be aborted by txn manager. // and after renew, the previous task is removed from routineLoadTaskInfoList, // so task can no longer be committed successfully. + // the already committed task will not be handled here. RoutineLoadTaskInfo newTask = unprotectRenewTask(routineLoadTaskInfo); Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newTask); } @@ -698,7 +690,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); taskBeId = routineLoadTaskInfo.getBeId(); - executeCommitTask(routineLoadTaskInfo, txnState); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.COMMITTED); ++committedTaskNum; } } catch (Throwable e) { @@ -720,6 +712,56 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl LOG.debug("replay on committed: {}", txnState); } + /* + * the corresponding txn is visible, create a new task + */ + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + if (!txnOperated) { + String msg = String.format( + "should not happen, we find that txnOperated if false when handling afterVisble. job id: %d, txn id: %d", + id, txnState.getTransactionId()); + LOG.warn(msg); + // print a log and return. + // if this really happen, the job will be blocked, and this task can be seen by + // "show routine load task" stmt, which is in COMMITTED state for a long time. + // so we can find this error and step in. + return; + } + + writeLock(); + try { + if (state != JobState.RUNNING) { + // job is not running, nothing need to be done + return; + } + + Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.stream().filter( + entity -> entity.getTxnId() == txnState.getTransactionId()).findFirst(); + RoutineLoadTaskInfo routineLoadTaskInfo = routineLoadTaskInfoOptional.get(); + if (routineLoadTaskInfo.getTxnStatus() != TransactionStatus.COMMITTED) { + // TODO(cmy): Normally, this should not happen. But for safe reason, just pause the job + String msg = String.format( + "should not happen, we find that task %s is not COMMITTED when handling afterVisble. job id: %d, txn id: %d, txn status: %s", + DebugUtil.printId(routineLoadTaskInfo.getId()), id, txnState.getTransactionId(), routineLoadTaskInfo.getTxnStatus().name()); + LOG.warn(msg); + try { + updateState(JobState.PAUSED, msg, false /* not replay */); + } catch (UserException e) { + // should not happen + LOG.warn("failed to pause the job {}. this should not happen", id, e); + } + return; + } + + // create new task + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + } finally { + writeUnlock(); + } + } + // the task is aborted when the correct number of rows is more then 0 // be will abort txn when all of kafka data is wrong or total consume data is 0 // txn will be aborted but progress will be update @@ -765,7 +807,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl // TODO(ml): use previous be id depend on change reason } // step2: commit task , update progress, maybe create a new task - executeCommitTask(routineLoadTaskInfo, txnState); + executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED); } } catch (Exception e) { updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(), @@ -791,8 +833,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } // check task exists or not before call method - private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState) - throws UserException { + private void executeTaskOnTxnStatusChanged(RoutineLoadTaskInfo routineLoadTaskInfo, TransactionState txnState, + TransactionStatus txnStatus) throws UserException { // step0: get progress from transaction state RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = (RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment(); if (rlTaskTxnCommitAttachment == null) { @@ -814,9 +856,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } if (state == JobState.RUNNING) { - // step2: create a new task for partitions - RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); - Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + routineLoadTaskInfo.setTxnStatus(txnStatus); + if (txnStatus == TransactionStatus.ABORTED) { + RoutineLoadTaskInfo newRoutineLoadTaskInfo = unprotectRenewTask(routineLoadTaskInfo); + Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newRoutineLoadTaskInfo); + } else if (txnStatus == TransactionStatus.COMMITTED) { + // this txn is just COMMITTED, create new task when the this txn is VISIBLE + // or if publish version task has some error, there will be lots of COMMITTED txns in GlobalTransactionMgr + } } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index b7fb7a5395..3af9fb477e 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -29,6 +29,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.collect.Lists; @@ -66,6 +67,10 @@ public abstract class RoutineLoadTaskInfo { protected long timeoutMs = -1; + // this status will be set when corresponding transaction's status is changed. + // so that user or other logic can know the status of the corresponding txn. + protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN; + public RoutineLoadTaskInfo(UUID id, long jobId, String clusterName, long timeoutMs) { this.id = id; this.jobId = jobId; @@ -127,7 +132,20 @@ public abstract class RoutineLoadTaskInfo { return timeoutMs; } + public void setTxnStatus(TransactionStatus txnStatus) { + this.txnStatus = txnStatus; + } + + public TransactionStatus getTxnStatus() { + return txnStatus; + } + public boolean isTimeout() { + if (txnStatus == TransactionStatus.COMMITTED || txnStatus == TransactionStatus.VISIBLE) { + // the corresponding txn is already finished, this task can not be treated as timeout. + return false; + } + if (isRunning() && System.currentTimeMillis() - executeStartTimeMs > timeoutMs) { LOG.debug("task {} is timeout. start: {}, timeout: {}", DebugUtil.printId(id), executeStartTimeMs, timeoutMs); @@ -167,6 +185,7 @@ public abstract class RoutineLoadTaskInfo { List row = Lists.newArrayList(); row.add(DebugUtil.printId(id)); row.add(String.valueOf(txnId)); + row.add(txnStatus.name()); row.add(String.valueOf(jobId)); row.add(String.valueOf(TimeUtils.longToTimeString(createTimeMs))); row.add(String.valueOf(TimeUtils.longToTimeString(executeStartTimeMs))); diff --git a/fe/src/main/java/org/apache/doris/master/Checkpoint.java b/fe/src/main/java/org/apache/doris/master/Checkpoint.java index 77d9beb0c7..9c741ef8ac 100644 --- a/fe/src/main/java/org/apache/doris/master/Checkpoint.java +++ b/fe/src/main/java/org/apache/doris/master/Checkpoint.java @@ -104,6 +104,8 @@ public class Checkpoint extends MasterDaemon { checkPointVersion, catalog.getReplayedJournalId()); return; } + catalog.fixBugAfterMetadataReplayed(false); + catalog.saveImage(); replayedJournalId = catalog.getReplayedJournalId(); if (MetricRepo.isInit.get()) { diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index ed5da793ca..1e9f5a3e62 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -304,31 +304,20 @@ public class BrokerScanNode extends LoadScanNode { private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String brokerName) throws UserException { - List candidateBes = Lists.newArrayList(); - // Get backend - int numBe = Math.min(3, backends.size()); - for (int i = 0; i < numBe; ++i) { - candidateBes.add(backends.get(nextBe++)); - nextBe = nextBe % backends.size(); - } - // we shuffle it because if we only has 3 backends - // we will always choose the same backends without shuffle - Collections.shuffle(candidateBes); + Backend selectedBackend = backends.get(nextBe++); + nextBe = nextBe % backends.size(); // Generate on broker scan range TBrokerScanRange brokerScanRange = new TBrokerScanRange(); brokerScanRange.setParams(params); - int numBroker = Math.min(3, numBe); - for (int i = 0; i < numBroker; ++i) { - FsBroker broker = null; - try { - broker = Catalog.getInstance().getBrokerMgr().getBroker( - brokerName, candidateBes.get(i).getHost()); - } catch (AnalysisException e) { - throw new UserException(e.getMessage()); - } - brokerScanRange.addToBroker_addresses(new TNetworkAddress(broker.ip, broker.port)); + + FsBroker broker = null; + try { + broker = Catalog.getInstance().getBrokerMgr().getBroker(brokerName, selectedBackend.getHost()); + } catch (AnalysisException e) { + throw new UserException(e.getMessage()); } + brokerScanRange.addToBroker_addresses(new TNetworkAddress(broker.ip, broker.port)); // Scan range TScanRange scanRange = new TScanRange(); @@ -337,12 +326,11 @@ public class BrokerScanNode extends LoadScanNode { // Locations TScanRangeLocations locations = new TScanRangeLocations(); locations.setScan_range(scanRange); - for (Backend be : candidateBes) { - TScanRangeLocation location = new TScanRangeLocation(); - location.setBackend_id(be.getId()); - location.setServer(new TNetworkAddress(be.getHost(), be.getBePort())); - locations.addToLocations(location); - } + + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackend_id(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); return locations; } @@ -401,12 +389,12 @@ public class BrokerScanNode extends LoadScanNode { private void assignBackends() throws UserException { backends = Lists.newArrayList(); for (Backend be : Catalog.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAlive()) { + if (be.isAvailable()) { backends.add(be); } } if (backends.isEmpty()) { - throw new UserException("No Alive backends"); + throw new UserException("No available backends"); } Collections.shuffle(backends, random); } diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 6ec041ef87..cfb83797cb 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -216,7 +216,7 @@ public class GlobalTransactionMgr implements Writable { private void checkRunningTxnExceedLimit(long dbId, LoadJobSourceType sourceType) throws BeginTransactionException { switch (sourceType) { case ROUTINE_LOAD_TASK: - // we do not limit the txn num of routine load here. for 2 reasons: + // no need to check limit for routine load task: // 1. the number of running routine load tasks is limited by Config.max_routine_load_task_num_per_be // 2. if we add routine load txn to runningTxnNums, runningTxnNums will always be occupied by routine load, // and other txn may not be able to submitted. diff --git a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index e628939a5e..040c1f2d03 100644 --- a/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -110,7 +110,7 @@ abstract public class DorisHttpTestCase { public static final int HTTP_PORT; static { Random r = new Random(System.currentTimeMillis()); - HTTP_PORT = 30000 + r.nextInt(10000); + HTTP_PORT = 20000 + r.nextInt(10000); } protected static final String URI = "http://localhost:" + HTTP_PORT + "/api/" + DB_NAME + "/" + TABLE_NAME; diff --git a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java index 8b71af71c0..7007c9499d 100644 --- a/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java +++ b/fe/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java @@ -247,7 +247,7 @@ public class AuthTest { } // 5.1 resolve domain [palo.domain1] - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); // 6. check if user from resolved ip can access to palo Assert.assertTrue(auth.checkPlainPassword(SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", "10.1.1.1", @@ -296,7 +296,7 @@ public class AuthTest { } // 8.1 resolve domain [palo.domain2] - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkPlainPassword(SystemInfoService.DEFAULT_CLUSTER + ":lisi", "20.1.1.1", "123456")); @@ -470,7 +470,7 @@ public class AuthTest { SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.ALTER)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", @@ -502,7 +502,7 @@ public class AuthTest { SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.SELECT)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", @@ -714,7 +714,7 @@ public class AuthTest { Assert.assertTrue(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertFalse(auth.checkDbPriv("10.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db3", SystemInfoService.DEFAULT_CLUSTER + ":zhangsan", PrivPredicate.DROP)); @@ -851,7 +851,7 @@ public class AuthTest { Assert.assertFalse(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertTrue(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); @@ -894,7 +894,7 @@ public class AuthTest { Assert.assertTrue(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP)); - resolver.runOneCycle(); + resolver.runAfterCatalogReady(); Assert.assertFalse(auth.checkDbPriv("20.1.1.1", SystemInfoService.DEFAULT_CLUSTER + ":db4", SystemInfoService.DEFAULT_CLUSTER + ":chenliu", PrivPredicate.DROP));