[Bug] Fix some bugs of load job scheduler (#4869)
* [Bug] Fix some bugs of load job scheduler 1. The fix load meta bug logic should be removed since 0.12. 2. The load task thread pool's waiting queue should be as long as desired pending jobs num. 3. Submit the load task outside database lock to prevent holding lock for long time.
This commit is contained in:
@ -387,7 +387,9 @@ public class Catalog {
|
||||
|
||||
private TabletChecker tabletChecker;
|
||||
|
||||
private MasterTaskExecutor loadTaskScheduler;
|
||||
// Thread pools for pending and loading task, separately
|
||||
private MasterTaskExecutor pendingLoadTaskScheduler;
|
||||
private MasterTaskExecutor loadingLoadTaskScheduler;
|
||||
|
||||
private LoadJobScheduler loadJobScheduler;
|
||||
|
||||
@ -537,7 +539,10 @@ public class Catalog {
|
||||
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
|
||||
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
|
||||
|
||||
this.loadTaskScheduler = new MasterTaskExecutor("load_task_scheduler", Config.async_load_task_pool_size, !isCheckpointCatalog);
|
||||
this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending_load_task_scheduler", Config.async_load_task_pool_size,
|
||||
Config.desired_max_waiting_jobs, !isCheckpointCatalog);
|
||||
this.loadingLoadTaskScheduler = new MasterTaskExecutor("loading_load_task_scheduler", Config.async_load_task_pool_size,
|
||||
Config.desired_max_waiting_jobs, !isCheckpointCatalog);
|
||||
this.loadJobScheduler = new LoadJobScheduler();
|
||||
this.loadManager = new LoadManager(loadJobScheduler);
|
||||
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
|
||||
@ -1238,9 +1243,6 @@ public class Catalog {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("start to fix meta data bug");
|
||||
loadManager.fixLoadJobMetaBugs(globalTransactionMgr);
|
||||
}
|
||||
|
||||
// start all daemon threads only running on Master
|
||||
@ -1262,7 +1264,8 @@ public class Catalog {
|
||||
LoadChecker.init(Config.load_checker_interval_second * 1000L);
|
||||
LoadChecker.startAll();
|
||||
// New load scheduler
|
||||
loadTaskScheduler.start();
|
||||
pendingLoadTaskScheduler.start();
|
||||
loadingLoadTaskScheduler.start();
|
||||
loadManager.prepareJobs();
|
||||
loadJobScheduler.start();
|
||||
loadTimeoutChecker.start();
|
||||
@ -4822,8 +4825,12 @@ public class Catalog {
|
||||
return loadManager;
|
||||
}
|
||||
|
||||
public MasterTaskExecutor getLoadTaskScheduler() {
|
||||
return loadTaskScheduler;
|
||||
public MasterTaskExecutor getPendingLoadTaskScheduler() {
|
||||
return pendingLoadTaskScheduler;
|
||||
}
|
||||
|
||||
public MasterTaskExecutor getLoadingLoadTaskScheduler() {
|
||||
return loadingLoadTaskScheduler;
|
||||
}
|
||||
|
||||
public RoutineLoadManager getRoutineLoadManager() {
|
||||
|
||||
@ -40,8 +40,8 @@ import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.BeginTransactionException;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -93,7 +93,7 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
protected void unprotectedExecuteJob() {
|
||||
LoadTask task = new BrokerLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(), brokerDesc);
|
||||
idToTasks.put(task.getSignature(), task);
|
||||
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
|
||||
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -170,9 +170,9 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
|
||||
private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
|
||||
// divide job into broker loading task by table
|
||||
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
|
||||
db.readLock();
|
||||
try {
|
||||
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
|
||||
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) {
|
||||
FileGroupAggKey aggKey = entry.getKey();
|
||||
List<BrokerFileGroup> brokerFileGroups = entry.getValue();
|
||||
@ -208,13 +208,15 @@ public class BrokerLoadJob extends BulkLoadJob {
|
||||
}
|
||||
txnState.addTableIndexes(table);
|
||||
}
|
||||
// submit all tasks together
|
||||
for (LoadTask loadTask : newLoadingTasks) {
|
||||
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
|
||||
}
|
||||
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
|
||||
// Submit task outside the database lock, cause it may take a while if task queue is full.
|
||||
for (LoadTask loadTask : newLoadingTasks) {
|
||||
Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask);
|
||||
}
|
||||
}
|
||||
|
||||
private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
|
||||
|
||||
@ -45,7 +45,7 @@ public class BrokerLoadPendingTask extends LoadTask {
|
||||
public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback,
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
|
||||
BrokerDesc brokerDesc) {
|
||||
super(loadTaskCallback);
|
||||
super(loadTaskCallback, TaskType.PENDING);
|
||||
this.retryTime = 3;
|
||||
this.attachment = new BrokerPendingTaskAttachment(signature);
|
||||
this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
@ -43,14 +44,14 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
import org.apache.doris.transaction.TabletCommitInfo;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -235,8 +236,15 @@ public abstract class BulkLoadJob extends LoadJob {
|
||||
idToTasks.put(loadTask.getSignature(), loadTask);
|
||||
// load id will be added to loadStatistic when executing this task
|
||||
try {
|
||||
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
|
||||
} catch (RejectedExecutionException e) {
|
||||
if (loadTask.getTaskType() == LoadTask.TaskType.PENDING) {
|
||||
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(loadTask);
|
||||
} else if (loadTask.getTaskType() == LoadTask.TaskType.LOADING) {
|
||||
Catalog.getCurrentCatalog().getLoadingLoadTaskScheduler().submit(loadTask);
|
||||
} else {
|
||||
throw new LoadException(String.format("Unknown load task type: %s. task id: %d, job id, %d",
|
||||
loadTask.getTaskType(), loadTask.getSignature(), id));
|
||||
}
|
||||
} catch (RejectedExecutionException | LoadException e) {
|
||||
unprotectedExecuteCancel(failMsg, true);
|
||||
logFinalOperation();
|
||||
return;
|
||||
|
||||
@ -68,7 +68,7 @@ public class LoadLoadingTask extends LoadTask {
|
||||
long jobDeadlineMs, long execMemLimit, boolean strictMode,
|
||||
long txnId, LoadTaskCallback callback, String timezone,
|
||||
long timeoutS) {
|
||||
super(callback);
|
||||
super(callback, TaskType.LOADING);
|
||||
this.db = db;
|
||||
this.table = table;
|
||||
this.brokerDesc = brokerDesc;
|
||||
|
||||
@ -45,6 +45,7 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionStatus;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -702,6 +703,8 @@ public class LoadManager implements Writable{
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
// Deprecated in version 0.12
|
||||
// This method is only for bug fix. And should be call after image and edit log are replayed.
|
||||
public void fixLoadJobMetaBugs(GlobalTransactionMgr txnMgr) {
|
||||
for (LoadJob job : idToLoadJob.values()) {
|
||||
@ -738,11 +741,11 @@ public class LoadManager implements Writable{
|
||||
* replay process. This results in that when the FE restarts, these load jobs
|
||||
* that should have been completed are re-entered into the pending state,
|
||||
* resulting in repeated submission load tasks.
|
||||
*
|
||||
*
|
||||
* Those wrong images are unrecoverable, so that we have to cancel all load jobs
|
||||
* in PENDING or LOADING state when restarting FE, to avoid submit jobs
|
||||
* repeatedly.
|
||||
*
|
||||
*
|
||||
* This code can be remove when upgrading from 0.11.x to future version.
|
||||
*/
|
||||
if (job.getState() == JobState.LOADING || job.getState() == JobState.PENDING) {
|
||||
@ -773,7 +776,7 @@ public class LoadManager implements Writable{
|
||||
// it would be failed if FE restart.
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, "fe restart"), false, false);
|
||||
LOG.info("transfer mini load job {} state from {} to CANCELLED, because transaction status is unknown"
|
||||
+ ". label: {}, db: {}",
|
||||
+ ". label: {}, db: {}",
|
||||
job.getId(), prevState, job.getLabel(), job.getDbId());
|
||||
} else {
|
||||
// txn is not found. here are 2 cases:
|
||||
@ -784,7 +787,7 @@ public class LoadManager implements Writable{
|
||||
// removed by expiration).
|
||||
// Without affecting the first case of job, we set the job finish time to be the same as the create time.
|
||||
// In this way, the second type of job will be automatically cleared after running removeOldLoadJob();
|
||||
|
||||
|
||||
// use CancelType.UNKNOWN, so that we can set finish time to be the same as the create time
|
||||
job.cancelJobWithoutCheck(new FailMsg(CancelType.TXN_UNKNOWN, "transaction status is unknown"), false, false);
|
||||
LOG.info("finish load job {} from {} to CANCELLED, because transaction status is unknown. label: {}, db: {}, create: {}",
|
||||
|
||||
@ -36,14 +36,21 @@ public abstract class LoadTask extends MasterTask {
|
||||
DELETE
|
||||
}
|
||||
|
||||
public enum TaskType {
|
||||
PENDING,
|
||||
LOADING
|
||||
}
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(LoadTask.class);
|
||||
|
||||
protected TaskType taskType;
|
||||
protected LoadTaskCallback callback;
|
||||
protected TaskAttachment attachment;
|
||||
protected FailMsg failMsg = new FailMsg();
|
||||
protected int retryTime = 1;
|
||||
|
||||
public LoadTask(LoadTaskCallback callback) {
|
||||
public LoadTask(LoadTaskCallback callback, TaskType taskType) {
|
||||
this.taskType = taskType;
|
||||
this.signature = Catalog.getCurrentCatalog().getNextId();
|
||||
this.callback = callback;
|
||||
}
|
||||
@ -96,4 +103,8 @@ public abstract class LoadTask extends MasterTask {
|
||||
this.retryTime--;
|
||||
this.signature = Catalog.getCurrentCatalog().getNextId();
|
||||
}
|
||||
|
||||
public TaskType getTaskType() {
|
||||
return taskType;
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +83,7 @@ import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
|
||||
import org.apache.doris.transaction.TransactionState.TxnSourceType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -196,7 +197,7 @@ public class SparkLoadJob extends BulkLoadJob {
|
||||
sparkResource, brokerDesc);
|
||||
task.init();
|
||||
idToTasks.put(task.getSignature(), task);
|
||||
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
|
||||
Catalog.getCurrentCatalog().getPendingLoadTaskScheduler().submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -57,6 +57,7 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
|
||||
import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
|
||||
import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -88,7 +89,7 @@ public class SparkLoadPendingTask extends LoadTask {
|
||||
public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
|
||||
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
|
||||
SparkResource resource, BrokerDesc brokerDesc) {
|
||||
super(loadTaskCallback);
|
||||
super(loadTaskCallback, TaskType.PENDING);
|
||||
this.retryTime = 3;
|
||||
this.attachment = new SparkPendingTaskAttachment(signature);
|
||||
this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
|
||||
|
||||
@ -17,9 +17,10 @@
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -45,6 +46,12 @@ public class MasterTaskExecutor {
|
||||
scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
|
||||
}
|
||||
|
||||
public MasterTaskExecutor(String name, int threadNum, int queueSize, boolean needRegisterMetric) {
|
||||
executor = ThreadPoolManager.newDaemonFixedThreadPool(threadNum, queueSize, name + "_pool", needRegisterMetric);
|
||||
runningTasks = Maps.newHashMap();
|
||||
scheduledThreadPool = ThreadPoolManager.newDaemonScheduledThreadPool(1, name + "_scheduler_thread_pool", needRegisterMetric);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
scheduledThreadPool.scheduleAtFixedRate(new TaskChecker(), 0L, 1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TabletCommitInfo;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
@ -209,7 +210,7 @@ public class SparkLoadJobTest {
|
||||
pendingTask.init();
|
||||
pendingTask.getSignature();
|
||||
result = pendingTaskId;
|
||||
catalog.getLoadTaskScheduler();
|
||||
catalog.getPendingLoadTaskScheduler();
|
||||
result = executor;
|
||||
executor.submit((SparkLoadPendingTask) any);
|
||||
result = true;
|
||||
|
||||
Reference in New Issue
Block a user