From 4204a878c895c6ccfb3d188ec84a1c42e83b290c Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 10 Nov 2020 10:50:31 +0800 Subject: [PATCH] [Bug] Fix some bugs of load job scheduler (#4869) * [Bug] Fix some bugs of load job scheduler 1. The fix load meta bug logic should be removed since 0.12. 2. The load task thread pool's waiting queue should be as long as desired pending jobs num. 3. Submit the load task outside database lock to prevent holding lock for long time. --- .../org/apache/doris/catalog/Catalog.java | 23 ++++++++++++------- .../doris/load/loadv2/BrokerLoadJob.java | 16 +++++++------ .../load/loadv2/BrokerLoadPendingTask.java | 2 +- .../apache/doris/load/loadv2/BulkLoadJob.java | 18 +++++++++++---- .../doris/load/loadv2/LoadLoadingTask.java | 2 +- .../apache/doris/load/loadv2/LoadManager.java | 11 +++++---- .../apache/doris/load/loadv2/LoadTask.java | 13 ++++++++++- .../doris/load/loadv2/SparkLoadJob.java | 3 ++- .../load/loadv2/SparkLoadPendingTask.java | 3 ++- .../apache/doris/task/MasterTaskExecutor.java | 9 +++++++- .../doris/load/loadv2/SparkLoadJobTest.java | 3 ++- 11 files changed, 72 insertions(+), 31 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 2b73e27ccd..10d184d14e 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -387,7 +387,9 @@ public class Catalog { private TabletChecker tabletChecker; - private MasterTaskExecutor loadTaskScheduler; + // Thread pools for pending and loading task, separately + private MasterTaskExecutor pendingLoadTaskScheduler; + private MasterTaskExecutor loadingLoadTaskScheduler; private LoadJobScheduler loadJobScheduler; @@ -537,7 +539,10 @@ public class Catalog { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); - this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog); + this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); + this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size, + Config.desired_max_waiting_jobs, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); @@ -1238,9 +1243,6 @@ public class Catalog { } } } - - LOG.info("start to fix meta data bug"); - loadManager.fixLoadJobMetaBugs(globalTransactionMgr); } // start all daemon threads only running on Master @@ -1262,7 +1264,8 @@ public class Catalog { LoadChecker.init(Config.load_checker_interval_second * 1000L); LoadChecker.startAll(); // New load scheduler - loadTaskScheduler.start(); + pendingLoadTaskScheduler.start(); + loadingLoadTaskScheduler.start(); loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); @@ -4822,8 +4825,12 @@ public class Catalog { return loadManager; } - public MasterTaskExecutor getLoadTaskScheduler() { - return loadTaskScheduler; + public MasterTaskExecutor getPendingLoadTaskScheduler() { + return pendingLoadTaskScheduler; + } + + public MasterTaskExecutor getLoadingLoadTaskScheduler() { + return loadingLoadTaskScheduler; } public RoutineLoadManager getRoutineLoadManager() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index a253dbe919..719a4feb05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -40,8 +40,8 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -93,7 +93,7 @@ public class BrokerLoadJob extends BulkLoadJob { protected void unprotectedExecuteJob() { LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } /** @@ -170,9 +170,9 @@ public class BrokerLoadJob extends BulkLoadJob { private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException { // divide job into broker loading task by table + List newLoadingTasks = Lists.newArrayList(); db.readLock(); try { - List newLoadingTasks = Lists.newArrayList(); for (Map.Entry> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) { FileGroupAggKey aggKey = entry.getKey(); List brokerFileGroups = entry.getValue(); @@ -208,13 +208,15 @@ public class BrokerLoadJob extends BulkLoadJob { } txnState.addTableIndexes(table); } - // submit all tasks together - for (LoadTask loadTask : newLoadingTasks) { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } + } finally { db.readUnlock(); } + + // Submit task outside the database lock, cause it may take a while if task queue is full. + for (LoadTask loadTask : newLoadingTasks) { + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); + } } private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index a4cff888bf..f50c80c123 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -45,7 +45,7 @@ public class BrokerLoadPendingTask extends LoadTask { public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new BrokerPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 1d649c6562..c3a6ba5d6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -28,6 +28,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; @@ -43,14 +44,14 @@ import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -235,8 +236,15 @@ public abstract class BulkLoadJob extends LoadJob { idToTasks.put(loadTask.getSignature(), loadTask); // load id will be added to loadStatistic when executing this task try { - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - } catch (RejectedExecutionException e) { + if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) { + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(loadTask); + } else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) { + Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask); + } else { + throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d", + loadTask.getTaskType(), loadTask.getSignature(), id)); + } + } catch (RejectedExecutionException | LoadException e) { unprotectedExecuteCancel(failMsg, true); logFinalOperation(); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index caa6081e70..85c939f492 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -68,7 +68,7 @@ public class LoadLoadingTask extends LoadTask { long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS) { - super(callback); + super(callback, TaskType.LOADING); this.db = db; this.table = table; this.brokerDesc = brokerDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 8b42ab7c10..33e3490a8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; + import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -702,6 +703,8 @@ public class LoadManager implements Writable{ } } + @Deprecated + // Deprecated in version 0.12 // This method is only for bug fix. And should be call after image and edit log are replayed. public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) { for (LoadJob job : idToLoadJob.values()) { @@ -738,11 +741,11 @@ public class LoadManager implements Writable{ * replay process. This results in that when the FE restarts, these load jobs * that should have been completed are re-entered into the pending state, * resulting in repeated submission load tasks. - * + * * Those wrong images are unrecoverable, so that we have to cancel all load jobs * in PENDING or LOADING state when restarting FE, to avoid submit jobs * repeatedly. - * + * * This code can be remove when upgrading from 0.11.x to future version. */ if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) { @@ -773,7 +776,7 @@ public class LoadManager implements Writable{ // it would be failed if FE restart. job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false); LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown" - + ". label: {}, db: {}", + + ". label: {}, db: {}", job.getId(), prevState, job.getLabel(), job.getDbId()); } else { // txn is not found. here are 2 cases: @@ -784,7 +787,7 @@ public class LoadManager implements Writable{ // removed by expiration). // Without affecting the first case of job, we set the job finish time to be the same as the create time. // In this way, the second type of job will be automatically cleared after running removeOldLoadJob(); - + // use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false); LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java index 9a4568a1bc..6ee0cfe4e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -36,14 +36,21 @@ public abstract class LoadTask extends MasterTask { DELETE } + public enum TaskType { + PENDING, + LOADING + } + private static final Logger LOG = LogManager.getLogger(LoadTask.class); + protected TaskType taskType; protected LoadTaskCallback callback; protected TaskAttachment attachment; protected FailMsg failMsg = new FailMsg(); protected int retryTime = 1; - public LoadTask(LoadTaskCallback callback) { + public LoadTask(LoadTaskCallback callback, TaskType taskType) { + this.taskType = taskType; this.signature = Catalog.getCurrentCatalog().getNextId(); this.callback = callback; } @@ -96,4 +103,8 @@ public abstract class LoadTask extends MasterTask { this.retryTime--; this.signature = Catalog.getCurrentCatalog().getNextId(); } + + public TaskType getTaskType() { + return taskType; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9297295748..6ee5d87955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -83,6 +83,7 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -196,7 +197,7 @@ public class SparkLoadJob extends BulkLoadJob { sparkResource, brokerDesc); task.init(); idToTasks.put(task.getSignature(), task); - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index dbf5247a43..7445e2b4e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -57,6 +57,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable; import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion; import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType; import org.apache.doris.transaction.TransactionState; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -88,7 +89,7 @@ public class SparkLoadPendingTask extends LoadTask { public SparkLoadPendingTask(SparkLoadJob loadTaskCallback, Map> aggKeyToBrokerFileGroups, SparkResource resource, BrokerDesc brokerDesc) { - super(loadTaskCallback); + super(loadTaskCallback, TaskType.PENDING); this.retryTime = 3; this.attachment = new SparkPendingTaskAttachment(signature); this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java index 9b26556a3d..932e6080b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/MasterTaskExecutor.java @@ -17,9 +17,10 @@ package org.apache.doris.task; +import org.apache.doris.common.ThreadPoolManager; + import com.google.common.collect.Maps; -import org.apache.doris.common.ThreadPoolManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,6 +46,12 @@ public class MasterTaskExecutor { scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); } + public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) { + executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric); + runningTasks = Maps.newHashMap(); + scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric); + } + public void start() { scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 7f263d7a16..c3c64d13ed 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -61,6 +61,7 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -209,7 +210,7 @@ public class SparkLoadJobTest { pendingTask.init(); pendingTask.getSignature(); result = pendingTaskId; - catalog.getLoadTaskScheduler(); + catalog.getPendingLoadTaskScheduler(); result = executor; executor.submit((SparkLoadPendingTask) any); result = true;