Add new scheduler of load in fe (#1076)

* Add new scheduler of load in fe

1. New scheduler only support the broker load now.
2. The stage of load consist of PENDING -> LOADING -> FINISHED
3. The LoadScheduler will divide job into a pending task. There are preparations that need to be done on pending task.
4. OnPendingTaskFinished will be invoked after pending task. It is used to submit the loading task which is created based on attachment of pending task.
5. OnLoadingTaskFinished will be invoked after loding task. It is used to record the commit info and commit txn when all of task has been finished.
.

* Combine pendingTask and loadingTask into loadTask

1. The load task callback include two methods: onTaskFinished, onTaskFailed

* Add txn callback of load job

1. isCommittting is true when beforeCommitted in txn
2. job could not be cancelled when isCommitting is true
3. job will be finished after txn is visible
4. old job will be cleaned when (CurrentTS - FinishedTs) / 1000 > Config.label_keep_seconds
5. LoadTimeoutChecker is performed to cancel timeout job
This commit is contained in:
EmmyMiao87
2019-05-06 13:49:06 +08:00
committed by ZHAO Chun
parent ba78adae94
commit 11be24df40
25 changed files with 1805 additions and 25 deletions

View File

@ -132,6 +132,8 @@ import org.apache.doris.load.LoadChecker;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
@ -175,6 +177,7 @@ import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PullLoadJobMgr;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -260,6 +263,7 @@ public class Catalog {
private ConcurrentHashMap<String, Cluster> nameToCluster;
private Load load;
private LoadManager loadManager;
private RoutineLoadManager routineLoadManager;
private ExportMgr exportMgr;
private Clone clone;
@ -351,6 +355,10 @@ public class Catalog {
private TabletChecker tabletChecker;
private MasterTaskExecutor loadTaskScheduler;
private LoadJobScheduler loadJobScheduler;
private RoutineLoadScheduler routineLoadScheduler;
private RoutineLoadTaskScheduler routineLoadTaskScheduler;
@ -473,6 +481,9 @@ public class Catalog {
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
this.loadTaskScheduler = new MasterTaskExecutor(10);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager);
}
@ -2037,6 +2048,7 @@ public class Catalog {
protected void runOneCycle() {
load.removeOldLoadJobs();
load.removeOldDeleteJobs();
loadManager.removeOldLoadJob();
exportMgr.removeOldExportJobs();
}
};
@ -4555,6 +4567,14 @@ public class Catalog {
return this.load;
}
public LoadManager getLoadManager() {
return loadManager;
}
public MasterTaskExecutor getLoadTaskScheduler() {
return loadTaskScheduler;
}
public RoutineLoadManager getRoutineLoadManager() {
return routineLoadManager;
}

View File

@ -54,10 +54,6 @@ public class BrokerFileGroup implements Writable {
// input
private DataDescription dataDescription;
// Now we don't save this in image, only use this to parse DataDescription;
// if we have this require later, we save this here.
private BrokerDesc brokerDesc;
private long tableId;
private String valueSeparator;
private String lineDelimiter;
@ -83,9 +79,8 @@ public class BrokerFileGroup implements Writable {
this.filePathes = table.getPaths();
}
public BrokerFileGroup(DataDescription dataDescription, BrokerDesc desc) {
public BrokerFileGroup(DataDescription dataDescription) {
this.dataDescription = dataDescription;
this.brokerDesc = desc;
exprColumnMap = dataDescription.getParsedExprMap();
}
@ -164,6 +159,10 @@ public class BrokerFileGroup implements Writable {
return partitionIds;
}
public List<String> getPartitionNames(){
return dataDescription.getPartitionNames();
}
public List<String> getFilePathes() {
return filePathes;
}

View File

@ -81,6 +81,10 @@ public class EtlStatus implements Writable {
return counters;
}
public void replaceCounter(String key, String value) {
counters.put(key, value);
}
public void setCounters(Map<String, String> counters) {
this.counters = counters;
}
@ -93,6 +97,16 @@ public class EtlStatus implements Writable {
this.fileMap = fileMap;
}
public void addAllFileMap(Map<String, Long> fileMap) {
this.fileMap.putAll(fileMap);
}
public void reset() {
this.stats.clear();
this.counters.clear();
this.fileMap.clear();
}
@Override
public String toString() {
return "EtlTaskStatus [state=" + state + ", trackingUrl=" + trackingUrl + ", stats=" + stats + ", counters="

View File

@ -519,7 +519,7 @@ public class Load {
Map<Long, Map<Long, List<Source>>> tableToPartitionSources = Maps.newHashMap();
for (DataDescription dataDescription : dataDescriptions) {
// create source
createSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag());
checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag());
job.addTableName(dataDescription.getTableName());
}
for (Entry<Long, Map<Long, List<Source>>> tableEntry : tableToPartitionSources.entrySet()) {
@ -538,7 +538,7 @@ public class Load {
if (etlJobType == EtlJobType.BROKER) {
PullLoadSourceInfo sourceInfo = new PullLoadSourceInfo();
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription, stmt.getBrokerDesc());
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db);
sourceInfo.addFileGroup(fileGroup);
}
@ -646,8 +646,10 @@ public class Load {
return job;
}
private void createSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, boolean deleteFlag) throws DdlException {
public static void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
boolean deleteFlag)
throws DdlException {
Source source = new Source(dataDescription.getFilePathes());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();
@ -846,7 +848,7 @@ public class Load {
checkMini = false;
}
isLabelUsed(dbId, label, -1, checkMini);
unprotectIsLabelUsed(dbId, label, -1, checkMini);
// add job
Map<String, List<LoadJob>> labelToLoadJobs = null;
@ -976,7 +978,7 @@ public class Load {
long dbId = db.getId();
writeLock();
try {
if (isLabelUsed(dbId, label, timestamp, true)) {
if (unprotectIsLabelUsed(dbId, label, timestamp, true)) {
// label is used and this is a retry request.
// no need to do further operation, just return.
return false;
@ -1020,6 +1022,15 @@ public class Load {
}
}
public boolean isLabelUsed(long dbId, String label) throws DdlException {
readLock();
try {
return unprotectIsLabelUsed(dbId, label, -1, false);
} finally {
readUnlock();
}
}
/*
* 1. if label is already used, and this is not a retry request,
* throw exception ("Label already used")
@ -1028,7 +1039,7 @@ public class Load {
* 3. if label is not used, return false
* 4. throw exception if encounter error.
*/
private boolean isLabelUsed(long dbId, String label, long timestamp, boolean checkMini)
private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, boolean checkMini)
throws DdlException {
// check dbLabelToLoadJobs
if (dbLabelToLoadJobs.containsKey(dbId)) {

View File

@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.PullLoadSourceInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
* Step1: BrokerPendingTask will be created on method of executeJob.
* Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished.
* Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished.
*/
public class BrokerLoadJob extends LoadJob {
private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
private BrokerDesc brokerDesc;
// include broker desc and data desc
private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo();
// it will be set to true when pending task finished
private boolean isLoading = false;
public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc) {
super(dbId, label);
this.timeoutSecond = Config.pull_load_task_default_timeout_second;
this.brokerDesc = brokerDesc;
}
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
// get db id
String dbName = stmt.getLabel().getDbName();
Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
if (db == null) {
throw new DdlException("Database[" + dbName + "] does not exist");
}
// create job
BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc());
brokerLoadJob.setJobProperties(stmt.getProperties());
brokerLoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions());
brokerLoadJob.setDataSourceInfo(db, stmt.getDataDescriptions());
return brokerLoadJob;
}
private void setDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db);
dataSourceInfo.addFileGroup(fileGroup);
}
}
@Override
public void executeJob() {
LoadTask task = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc);
tasks.add(task);
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
}
/**
* Situation1: When attachment is instance of BrokerPendingTaskAttachment, this method is called by broker pending task.
* LoadLoadingTask will be created after BrokerPendingTask is finished.
* Situation2: When attachment is instance of BrokerLoadingTaskAttachment, this method is called by LoadLoadingTask.
* CommitTxn will be called after all of LoadingTasks are finished.
*
* @param attachment
*/
@Override
public void onTaskFinished(TaskAttachment attachment) {
if (attachment instanceof BrokerPendingTaskAttachment) {
onPendingTaskFinished((BrokerPendingTaskAttachment) attachment);
} else if (attachment instanceof BrokerLoadingTaskAttachment) {
onLoadingTaskFinished((BrokerLoadingTaskAttachment) attachment);
}
}
@Override
public void onTaskFailed(String errMsg) {
cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, errMsg);
}
/**
* step1: divide job into loading task
* step2: init the plan of task
* step3: submit tasks into loadingTaskExecutor
* @param attachment BrokerPendingTaskAttachment
*/
private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) {
writeLock();
try {
// check if job has been cancelled
if (isFinished()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "this task will be ignored when job is finished")
.build());
return;
}
if (isLoading) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "this is a duplicated callback of pending task "
+ "when broker already has loading task")
.build());
return;
}
isLoading = true;
} finally {
writeUnlock();
}
Database db = null;
try {
db = getDb();
createLoadingTask(db, attachment);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "Failed to divide job into loading task.")
.build(), e);
cancelJobWithoutCheck(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage());
return;
}
loadStartTimestamp = System.currentTimeMillis();
}
private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException {
// divide job into broker loading task by table
db.readLock();
try {
for (Map.Entry<Long, List<BrokerFileGroup>> entry :
dataSourceInfo.getIdToFileGroups().entrySet()) {
long tableId = entry.getKey();
OlapTable table = (OlapTable) db.getTable(tableId);
if (table == null) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("table_id", tableId)
.add("error_msg", "Failed to divide job into loading task when table not found")
.build());
throw new MetaNotFoundException("Failed to divide job into loading task when table "
+ tableId + " not found");
}
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
entry.getValue(), getDeadlineMs(), execMemLimit,
transactionId, this);
task.init(attachment.getFileStatusByTable(tableId),
attachment.getFileNumByTable(tableId));
// Add tasks into list and pool
tasks.add(task);
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
}
} finally {
db.readUnlock();
}
}
private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) {
writeLock();
try {
// check if job has been cancelled
if (isFinished()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "this task will be ignored when job is finished")
.build());
return;
}
// update loading status
updateLoadingStatus(attachment);
// begin commit txn when all of loading tasks have been finished
if (!(tasks.size() == tasks.stream()
.filter(entity -> entity.isFinished()).count())) {
return;
}
// check data quality
if (!checkDataQuality()) {
executeCancel(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG);
return;
}
} finally {
writeUnlock();
}
try {
Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
dbId, transactionId, commitInfos);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "Failed to commit txn.")
.build(), e);
cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage());
return;
}
}
private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) {
loadingStatus.replaceCounter(DPP_ABNORMAL_ALL,
increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL)));
loadingStatus.replaceCounter(DPP_NORMAL_ALL,
increaseCounter(DPP_NORMAL_ALL, attachment.getCounter(DPP_NORMAL_ALL)));
if (attachment.getTrackingUrl() != null) {
loadingStatus.setTrackingUrl(attachment.getTrackingUrl());
}
commitInfos.addAll(attachment.getCommitInfoList());
int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count();
progress = finishedTaskNum / tasks.size() * 100;
if (progress == 100) {
progress = 99;
}
}
private String increaseCounter(String key, String deltaValue) {
long value = Long.valueOf(loadingStatus.getCounters().get(key));
if (deltaValue != null) {
value += Long.valueOf(deltaValue);
}
return String.valueOf(value);
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
public class BrokerLoadPendingTask extends LoadTask {
private static final Logger LOG = LogManager.getLogger(BrokerLoadPendingTask.class);
private Map<Long, List<BrokerFileGroup>> tableToBrokerFileList;
private BrokerDesc brokerDesc;
public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback,
Map<Long, List<BrokerFileGroup>> tableToBrokerFileList,
BrokerDesc brokerDesc) {
super(loadTaskCallback);
this.attachment = new BrokerPendingTaskAttachment();
this.tableToBrokerFileList = tableToBrokerFileList;
this.brokerDesc = brokerDesc;
}
@Override
void executeTask() throws UserException {
getAllFileStatus();
}
private void getAllFileStatus()
throws UserException {
for (Map.Entry<Long, List<BrokerFileGroup>> entry : tableToBrokerFileList.entrySet()) {
long tableId = entry.getKey();
List<List<TBrokerFileStatus>> fileStatusList = Lists.newArrayList();
List<BrokerFileGroup> fileGroups = entry.getValue();
for (BrokerFileGroup fileGroup : fileGroups) {
List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
for (String path : fileGroup.getFilePathes()) {
BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
}
fileStatusList.add(fileStatuses);
if (LOG.isDebugEnabled()) {
for (TBrokerFileStatus fstatus : fileStatuses) {
LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("file_status", fstatus)
.build());
}
}
}
((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.transaction.TabletCommitInfo;
import java.util.List;
import java.util.Map;
public class BrokerLoadingTaskAttachment implements TaskAttachment{
private Map<String, String> counters;
private String trackingUrl;
private List<TabletCommitInfo> commitInfoList;
public BrokerLoadingTaskAttachment(Map<String, String> counters, String trackingUrl,
List<TabletCommitInfo> commitInfoList) {
this.trackingUrl = trackingUrl;
this.counters = counters;
this.commitInfoList = commitInfoList;
}
public String getCounter(String key) {
return counters.get(key);
}
public String getTrackingUrl() {
return trackingUrl;
}
public List<TabletCommitInfo> getCommitInfoList() {
return commitInfoList;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
public class BrokerPendingTaskAttachment implements TaskAttachment {
// table id -> file status
private Map<Long, List<List<TBrokerFileStatus>>> fileStatusMap = Maps.newHashMap();
// table id -> total file num
private Map<Long, Integer> fileNumMap = Maps.newHashMap();
public void addFileStatus(long tableId, List<List<TBrokerFileStatus>> fileStatusList) {
fileStatusMap.put(tableId, fileStatusList);
fileNumMap.put(tableId, fileStatusList.stream().mapToInt(entity -> entity.size()).sum());
}
public List<List<TBrokerFileStatus>> getFileStatusByTable(long tableId) {
return fileStatusMap.get(tableId);
}
public int getFileNumByTable(long tableId) {
return fileNumMap.get(tableId);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
public enum JobState {
PENDING,
LOADING,
FINISHED,
CANCELLED
}

View File

@ -0,0 +1,436 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback {
private static final Logger LOG = LogManager.getLogger(LoadJob.class);
protected static final String QUALITY_FAIL_MSG = "quality not good enough to cancel";
protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL";
protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
protected long id = Catalog.getCurrentCatalog().getNextId();
protected long dbId;
protected String label;
protected JobState state = JobState.PENDING;
// optional properties
// timeout second need to be reset in constructor of subclass
protected int timeoutSecond = Config.pull_load_task_default_timeout_second;
protected long execMemLimit = 2147483648L; // 2GB;
protected double maxFilterRatio = 0;
protected boolean deleteFlag = false;
protected long createTimestamp = System.currentTimeMillis();
protected long loadStartTimestamp = -1;
protected long finishTimestamp = -1;
protected long transactionId;
protected FailMsg failMsg;
protected List<LoadTask> tasks = Lists.newArrayList();
protected EtlStatus loadingStatus = new EtlStatus();
// 0: the job status is pending
// n/100: n is the number of task which has been finished
// 99: all of tasks have been finished
// 100: txn status is visible and load has been finished
protected int progress;
protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();
// non-persistence
protected boolean isCommitting = false;
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public LoadJob(long dbId, String label) {
this.dbId = dbId;
this.label = label;
}
protected void readLock() {
lock.readLock().lock();
}
protected void readUnlock() {
lock.readLock().unlock();
}
protected void writeLock() {
lock.writeLock().lock();
}
protected void writeUnlock() {
lock.writeLock().unlock();
}
public long getId() {
return id;
}
public Database getDb() throws MetaNotFoundException {
// get db
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new MetaNotFoundException("Database " + dbId + " already has been deleted");
}
return db;
}
public long getDbId() {
return dbId;
}
public String getLabel() {
return label;
}
public JobState getState() {
return state;
}
public long getDeadlineMs() {
return createTimestamp + timeoutSecond * 1000;
}
public long getLeftTimeMs() {
return getDeadlineMs() - System.currentTimeMillis();
}
public long getFinishTimestamp() {
return finishTimestamp;
}
public boolean isFinished() {
readLock();
try {
return state == JobState.FINISHED || state == JobState.CANCELLED;
} finally {
readUnlock();
}
}
protected void setJobProperties(Map<String, String> properties) throws DdlException {
// resource info
if (ConnectContext.get() != null) {
execMemLimit = ConnectContext.get().getSessionVariable().getMaxExecMemByte();
}
// job properties
if (properties != null) {
if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
try {
timeoutSecond = Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY));
} catch (NumberFormatException e) {
throw new DdlException("Timeout is not INT", e);
}
}
if (properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
try {
maxFilterRatio = Double.parseDouble(properties.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY));
} catch (NumberFormatException e) {
throw new DdlException("Max filter ratio is not DOUBLE", e);
}
}
if (properties.containsKey(LoadStmt.LOAD_DELETE_FLAG_PROPERTY)) {
String flag = properties.get(LoadStmt.LOAD_DELETE_FLAG_PROPERTY);
if (flag.equalsIgnoreCase("true") || flag.equalsIgnoreCase("false")) {
deleteFlag = Boolean.parseBoolean(flag);
} else {
throw new DdlException("Value of delete flag is invalid");
}
}
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
try {
execMemLimit = Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
} catch (NumberFormatException e) {
throw new DdlException("Execute memory limit is not Long", e);
}
}
}
}
protected void checkDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
for (DataDescription dataDescription : dataDescriptions) {
// loadInfo is a temporary param for the method of checkAndCreateSource.
// <TableId,<PartitionId,<LoadInfoList>>>
Map<Long, Map<Long, List<Source>>> loadInfo = Maps.newHashMap();
// only support broker load now
Load.checkAndCreateSource(db, dataDescription, loadInfo, false);
}
}
public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
// register txn state listener
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.FRONTEND, id,
timeoutSecond - 1);
}
/**
* create pending task for load job and add pending task into pool
* if job has been cancelled, this step will be ignored
* @throws LabelAlreadyUsedException the job is duplicated
* @throws BeginTransactionException the limit of load job is exceeded
* @throws AnalysisException there are error params in job
*/
public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
writeLock();
try {
// check if job state is pending
if (state != JobState.PENDING) {
return;
}
// the limit of job will be restrict when begin txn
beginTxn();
executeJob();
unprotectedUpdateState(JobState.LOADING);
} finally {
writeUnlock();
}
}
public void processTimeout() {
writeLock();
try {
if (isFinished() || getDeadlineMs() >= System.currentTimeMillis() || isCommitting) {
return;
}
executeCancel(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel");
} finally {
writeUnlock();
}
}
abstract void executeJob();
public void updateState(JobState jobState) {
writeLock();
try {
unprotectedUpdateState(jobState);
} finally {
writeUnlock();
}
// TODO(ML): edit log
}
protected void unprotectedUpdateState(JobState jobState) {
switch (jobState) {
case LOADING:
executeLoad();
break;
case FINISHED:
executeFinish();
default:
break;
}
}
private void executeLoad() {
loadStartTimestamp = System.currentTimeMillis();
state = JobState.LOADING;
}
public void cancelJobWithoutCheck(FailMsg.CancelType cancelType, String errMsg) {
writeLock();
try {
executeCancel(cancelType, errMsg);
} finally {
writeUnlock();
}
}
public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlException {
writeLock();
try {
if (isCommitting) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "The txn which belongs to job is committing. "
+ "The job could not be cancelled in this step").build());
throw new DdlException("Job could not be cancelled while txn is committing");
}
executeCancel(cancelType, errMsg);
} finally {
writeUnlock();
}
}
protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("error_msg", "Failed to execute load with error " + errMsg)
.build());
// reset txn id
if (transactionId != -1) {
transactionId = -1;
}
// clean the loadingStatus
loadingStatus.reset();
loadingStatus.setState(TEtlState.CANCELLED);
// tasks will not be removed from task pool.
// it will be aborted on the stage of onTaskFinished or onTaskFailed.
tasks.clear();
// set failMsg and state
failMsg = new FailMsg(cancelType, errMsg);
finishTimestamp = System.currentTimeMillis();
state = JobState.CANCELLED;
// remove callback
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
}
private void executeFinish() {
progress = 100;
finishTimestamp = System.currentTimeMillis();
state = JobState.FINISHED;
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id);
MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
}
protected boolean checkDataQuality() {
Map<String, String> counters = loadingStatus.getCounters();
if (!counters.containsKey(DPP_NORMAL_ALL) || !counters.containsKey(DPP_ABNORMAL_ALL)) {
return true;
}
long normalNum = Long.parseLong(counters.get(DPP_NORMAL_ALL));
long abnormalNum = Long.parseLong(counters.get(DPP_ABNORMAL_ALL));
if (abnormalNum > (abnormalNum + normalNum) * maxFilterRatio) {
return false;
}
return true;
}
@Override
public long getCallbackId() {
return id;
}
@Override
public void beforeCommitted(TransactionState txnState) throws TransactionException {
writeLock();
try {
if (transactionId == -1) {
throw new TransactionException("txn could not be committed when job has been cancelled");
}
isCommitting = true;
} finally {
writeUnlock();
}
}
@Override
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException {
if (txnOperated) {
return;
}
writeLock();
try {
isCommitting = false;
} finally {
writeUnlock();
}
}
@Override
public void replayOnCommitted(TransactionState txnState) {
writeLock();
try {
isCommitting = true;
} finally {
writeUnlock();
}
}
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
throws UserException {
if (!txnOperated) {
return;
}
writeLock();
try {
if (transactionId == -1) {
return;
}
// cancel load job
executeCancel(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason);
} finally {
writeUnlock();
}
}
@Override
public void replayOnAborted(TransactionState txnState) {
cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, null);
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
updateState(JobState.FINISHED);
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.FailMsg;
import org.apache.doris.transaction.BeginTransactionException;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.LinkedBlockingQueue;
/**
* LoadScheduler will schedule the pending LoadJob which belongs to LoadManager.
* The function of execute will be called in LoadScheduler.
* The status of LoadJob will be changed to loading after LoadScheduler.
*/
public class LoadJobScheduler extends Daemon {
private static final Logger LOG = LogManager.getLogger(LoadJobScheduler.class);
private LinkedBlockingQueue<LoadJob> needScheduleJobs = Queues.newLinkedBlockingQueue();
public LoadJobScheduler() {
super("Load job scheduler", Config.load_checker_interval_second * 1000);
}
@Override
protected void runOneCycle() {
try {
process();
} catch (Throwable e) {
LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
}
}
private void process() throws InterruptedException {
while (true) {
// take one load job from queue
LoadJob loadJob = needScheduleJobs.poll();
if (loadJob == null) {
return;
}
// schedule job
try {
loadJob.execute();
} catch (LabelAlreadyUsedException | AnalysisException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "There are error properties in job. Job will be cancelled")
.build(), e);
loadJob.cancelJobWithoutCheck(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage());
continue;
} catch (BeginTransactionException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "Failed to begin txn when job is scheduling. "
+ "Job will be rescheduled later")
.build(), e);
needScheduleJobs.put(loadJob);
return;
}
}
}
public void submitJob(LoadJob job) {
needScheduleJobs.add(job);
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.task.MasterTask;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TabletCommitInfo;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.UUID;
public class LoadLoadingTask extends LoadTask {
private static final Logger LOG = LogManager.getLogger(LoadLoadingTask.class);
private final Database db;
private final OlapTable table;
private final BrokerDesc brokerDesc;
private final List<BrokerFileGroup> fileGroups;
private final long jobDeadlineMs;
private final long execMemLimit;
private final long txnId;
private LoadingTaskPlanner planner;
private String errMsg;
public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, long txnId, LoadTaskCallback callback) {
super(callback);
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
this.txnId = txnId;
}
public void init(List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups);
planner.plan(fileStatusList, fileNum);
}
@Override
protected void executeTask() throws UserException {
int retryTime = 3;
for (int i = 0; i < retryTime; ++i) {
isFinished = executeOnce();
if (isFinished) {
return;
}
}
throw new UserException(errMsg);
}
private boolean executeOnce() {
// New one query id,
UUID uuid = UUID.randomUUID();
TUniqueId executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Coordinator curCoordinator = new Coordinator(executeId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes(), db.getClusterName());
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
try {
QeProcessorImpl.INSTANCE
.registerQuery(executeId, curCoordinator);
return actualExecute(curCoordinator);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "failed to execute loading task")
.build(), e);
errMsg = e.getMessage();
return false;
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(executeId);
}
}
private boolean actualExecute(Coordinator curCoordinator) {
int waitSecond = (int) (getLeftTimeMs() / 1000);
if (waitSecond <= 0) {
errMsg = "time out";
return false;
}
try {
curCoordinator.exec();
} catch (Exception e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "coordinator execute failed")
.build(), e);
errMsg = "coordinator execute failed with error " + e.getMessage();
return false;
}
if (curCoordinator.join(waitSecond)) {
Status status = curCoordinator.getExecStatus();
if (status.ok()) {
attachment = new BrokerLoadingTaskAttachment(curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl(),
TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos()));
return true;
} else {
errMsg = status.getErrorMsg();
return false;
}
} else {
errMsg = "coordinator could not finished before job timeout";
return false;
}
}
private long getLeftTimeMs() {
return jobDeadlineMs - System.currentTimeMillis();
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* The broker and mini load jobs(v2) are included in this class.
*/
public class LoadManager {
private Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
private Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap();
private LoadJobScheduler loadJobScheduler;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
public LoadManager(LoadJobScheduler loadJobScheduler) {
this.loadJobScheduler = loadJobScheduler;
}
/**
* This method will be invoked by the broker load(v2) now.
* @param stmt
* @throws DdlException
*/
public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
long dbId = database.getId();
writeLock();
try {
isLabelUsed(dbId, stmt.getLabel().getLabelName());
if (stmt.getBrokerDesc() == null) {
throw new DdlException("LoadManager only support the broker load.");
}
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(stmt);
addLoadJob(brokerLoadJob);
} finally {
writeUnlock();
}
}
private void addLoadJob(LoadJob loadJob) {
idToLoadJob.put(loadJob.getId(), loadJob);
long dbId = loadJob.getDbId();
if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new ConcurrentHashMap<>());
}
if (!dbIdToLabelToLoadJobs.get(dbId).containsKey(loadJob.getLabel())) {
dbIdToLabelToLoadJobs.get(dbId).put(loadJob.getLabel(), new ArrayList<>());
}
dbIdToLabelToLoadJobs.get(dbId).get(loadJob.getLabel()).add(loadJob);
// submit it
loadJobScheduler.submitJob(loadJob);
}
public List<LoadJob> getLoadJobByState(JobState jobState) {
return idToLoadJob.values().stream()
.filter(entity -> entity.getState() == jobState)
.collect(Collectors.toList());
}
public void removeOldLoadJob() {
long currentTimeMs = System.currentTimeMillis();
writeLock();
try {
Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator();
while (iter.hasNext()) {
LoadJob job = iter.next().getValue();
if (job.isFinished()
&& ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) {
iter.remove();
dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
}
}
} finally {
writeUnlock();
}
}
public void processTimeoutJobs() {
idToLoadJob.values().stream().forEach(entity -> entity.processTimeout());
}
private Database checkDb(String dbName) throws DdlException {
// get db
Database db = Catalog.getInstance().getDb(dbName);
if (db == null) {
throw new DdlException("Database[" + dbName + "] does not exist");
}
return db;
}
/**
* step1: if label has been used in old load jobs which belong to load class
* step2: if label has been used in v2 load jobs
*
* @param dbId
* @param label
* @throws DdlException throw exception when label has been used by an unfinished job.
*/
private void isLabelUsed(long dbId, String label)
throws DdlException {
// if label has been used in old load jobs
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
// if label has been used in v2 of load jobs
if (dbIdToLabelToLoadJobs.containsKey(dbId)) {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
if (labelToLoadJobs.containsKey(label)) {
List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
if (labelLoadJobs.stream().filter(entity -> !entity.isFinished()).count() != 0) {
throw new LabelAlreadyUsedException(label);
}
}
}
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.task.MasterTask;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public abstract class LoadTask extends MasterTask {
private static final Logger LOG = LogManager.getLogger(LoadTask.class);
protected LoadTaskCallback callback;
protected TaskAttachment attachment;
protected boolean isFinished = false;
public LoadTask(LoadTaskCallback callback) {
this.callback = callback;
}
@Override
protected void exec() {
Exception exception = null;
try {
// execute pending task
executeTask();
isFinished = true;
// callback on pending task finished
callback.onTaskFinished(attachment);
} catch (Exception e) {
exception = e;
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
.add("error_msg", "Failed to execute load task").build(), e);
} finally {
if (!isFinished) {
// callback on pending task failed
callback.onTaskFailed(exception == null ? "unknown error" : exception.getMessage());
}
}
}
public boolean isFinished() {
return isFinished;
}
/**
* execute load task
*
* @throws UserException task is failed
*/
abstract void executeTask() throws UserException;
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
public interface LoadTaskCallback {
long getCallbackId();
void onTaskFinished(TaskAttachment attachment);
void onTaskFailed(String errMsg);
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.Daemon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* LoadTimeoutChecker is performed to cancel the timeout job.
* The job which is not finished, not cancelled, not isCommitting will be checked.
* The standard of timeout is CurrentTS > (CreateTs + timeoutSeconds * 1000).
*/
public class LoadTimeoutChecker extends Daemon {
private static final Logger LOG = LogManager.getLogger(LoadTimeoutChecker.class);
private LoadManager loadManager;
public LoadTimeoutChecker(LoadManager loadManager) {
super("Load job timeout checker", Config.load_checker_interval_second * 1000);
this.loadManager = loadManager;
}
@Override
protected void runOneCycle() {
try {
loadManager.processTimeoutJobs();
} catch (Throwable e) {
LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
public class LoadingTaskPlanner {
private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class);
// Input params
private final long txnId;
private final long dbId;
private final OlapTable table;
private final BrokerDesc brokerDesc;
private final List<BrokerFileGroup> fileGroups;
// Something useful
private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null);
private DescriptorTable descTable = analyzer.getDescTbl();
// Output params
private List<PlanFragment> fragments = Lists.newArrayList();
private List<ScanNode> scanNodes = Lists.newArrayList();
private int nextNodeId = 0;
public LoadingTaskPlanner(long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups) {
this.txnId = txnId;
this.dbId = dbId;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = brokerFileGroups;
}
public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) throws UserException {
// Generate tuple descriptor
List<Expr> slotRefs = Lists.newArrayList();
TupleDescriptor tupleDesc = descTable.createTupleDescriptor();
for (Column col : table.getBaseSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
if (col.isAllowNull()) {
slotDesc.setIsNullable(true);
} else {
slotDesc.setIsNullable(false);
}
slotRefs.add(new SlotRef(slotDesc));
}
// Generate plan trees
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(table, brokerDesc, fileGroups);
scanNode.init(analyzer);
scanNodes.add(scanNode);
// 2. Olap table sink
String partitionNames = convertBrokerDescPartitionInfo();
OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionNames);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
olapTableSink.init(loadId, txnId, dbId);
olapTableSink.finalize();
// 3. Plan fragment
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM);
sinkFragment.setSink(olapTableSink);
fragments.add(sinkFragment);
// 4. finalize
for (PlanFragment fragment : fragments) {
try {
fragment.finalize(analyzer, false);
} catch (NotImplementedException e) {
LOG.info("Fragment finalize failed.{}", e);
throw new UserException("Fragment finalize failed.");
}
}
Collections.reverse(fragments);
}
public DescriptorTable getDescTable() {
return descTable;
}
public List<PlanFragment> getFragments() {
return fragments;
}
public List<ScanNode> getScanNodes() {
return scanNodes;
}
private String convertBrokerDescPartitionInfo() {
String result = "";
for (BrokerFileGroup brokerFileGroup : fileGroups) {
result += Joiner.on(",").join(brokerFileGroup.getPartitionNames());
result += ",";
}
result = result.substring(0, result.length() - 2);
return result;
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.load.loadv2;
public interface TaskAttachment {
}

View File

@ -46,6 +46,7 @@ import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
@ -83,7 +84,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* The desireTaskConcurrentNum means that user expect the number of concurrent stream load
* The routine load job support different streaming medium such as KAFKA
*/
public abstract class RoutineLoadJob implements TxnStateChangeCallback, Writable {
public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback implements Writable {
private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class);
public static final long DEFAULT_MAX_ERROR_NUM = 0;

View File

@ -46,7 +46,7 @@ public class RoutineLoadScheduler extends Daemon {
}
public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) {
super("Routine load", DEFAULT_INTERVAL_SECONDS * 1000);
super("Routine load scheduler", DEFAULT_INTERVAL_SECONDS * 1000);
this.routineLoadManager = routineLoadManager;
}

View File

@ -69,12 +69,12 @@ public class RoutineLoadTaskScheduler extends Daemon {
@VisibleForTesting
public RoutineLoadTaskScheduler() {
super("routine load task", 0);
super("Routine load task scheduler", 0);
this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager();
}
public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) {
super("routine load task", 0);
super("Routine load task scheduler", 0);
this.routineLoadManager = routineLoadManager;
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.doris.transaction;
import org.apache.doris.common.UserException;
public abstract class AbstractTxnStateChangeCallback implements TxnStateChangeCallback {
@Override
public void beforeCommitted(TransactionState txnState) throws TransactionException {
}
@Override
public void beforeAborted(TransactionState txnState) throws TransactionException {
}
@Override
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException {
}
@Override
public void replayOnCommitted(TransactionState txnState) {
}
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException {
}
@Override
public void replayOnAborted(TransactionState txnState) {
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
}
}

View File

@ -566,7 +566,7 @@ public class GlobalTransactionMgr {
* @param errorReplicaIds
* @return
*/
public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) {
public void finishTransaction(long transactionId, Set<Long> errorReplicaIds) throws UserException {
TransactionState transactionState = idToTransactionState.get(transactionId);
// add all commit errors and publish errors to a single set
if (errorReplicaIds == null) {
@ -699,14 +699,17 @@ public class GlobalTransactionMgr {
if (hasError) {
return;
}
boolean txnOperated = false;
writeLock();
try {
transactionState.setErrorReplicas(errorReplicaIds);
transactionState.setFinishTime(System.currentTimeMillis());
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
unprotectUpsertTransactionState(transactionState);
txnOperated = true;
} finally {
writeUnlock();
transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated);
}
updateCatalogAfterVisible(transactionState, db);
} finally {

View File

@ -326,6 +326,9 @@ public class TransactionState implements Writable {
case COMMITTED:
callback.afterCommitted(this, txnOperated);
break;
case VISIBLE:
callback.afterVisible(this, txnOperated);
break;
default:
break;
}

View File

@ -21,7 +21,7 @@ import org.apache.doris.common.UserException;
public interface TxnStateChangeCallback {
public long getId();
long getId();
/**
* this interface is executed before txn committed, it will check if txn could be commit
@ -29,7 +29,7 @@ public interface TxnStateChangeCallback {
* @throws TransactionException if transaction could not be commit or there are some exception before committed,
* it will throw this exception. The txn will be committed failed.
*/
public void beforeCommitted(TransactionState txnState) throws TransactionException;
void beforeCommitted(TransactionState txnState) throws TransactionException;
/**
* this interface is executed before txn aborted, it will check if txn could be abort
@ -38,16 +38,16 @@ public interface TxnStateChangeCallback {
* @throws TransactionException if transaction could not be abort or there are some exception before aborted,
* it will throw this exception. The txn will be aborted failed.
*/
public void beforeAborted(TransactionState txnState) throws TransactionException;
void beforeAborted(TransactionState txnState) throws TransactionException;
/**
* update catalog of job which has related txn after transaction has been committed
*
* @param txnState
*/
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException;
void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException;
public void replayOnCommitted(TransactionState txnState);
void replayOnCommitted(TransactionState txnState);
/**
* this interface is executed when transaction has been aborted
@ -57,7 +57,9 @@ public interface TxnStateChangeCallback {
* maybe null
* @return
*/
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException;
void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException;
public void replayOnAborted(TransactionState txnState);
void replayOnAborted(TransactionState txnState);
void afterVisible(TransactionState txnState, boolean txnOperated);
}