diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index da9d5253b4..50088c87ba 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -132,6 +132,8 @@ import org.apache.doris.load.LoadChecker; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; +import org.apache.doris.load.loadv2.LoadJobScheduler; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; @@ -175,6 +177,7 @@ import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.task.PullLoadJobMgr; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -260,6 +263,7 @@ public class Catalog { private ConcurrentHashMap nameToCluster; private Load load; + private LoadManager loadManager; private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Clone clone; @@ -351,6 +355,10 @@ public class Catalog { private TabletChecker tabletChecker; + private MasterTaskExecutor loadTaskScheduler; + + private LoadJobScheduler loadJobScheduler; + private RoutineLoadScheduler routineLoadScheduler; private RoutineLoadTaskScheduler routineLoadTaskScheduler; @@ -473,6 +481,9 @@ public class Catalog { this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat); this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat); + this.loadTaskScheduler = new MasterTaskExecutor(10); + this.loadJobScheduler = new LoadJobScheduler(); + this.loadManager = new LoadManager(loadJobScheduler); this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager); } @@ -2037,6 +2048,7 @@ public class Catalog { protected void runOneCycle() { load.removeOldLoadJobs(); load.removeOldDeleteJobs(); + loadManager.removeOldLoadJob(); exportMgr.removeOldExportJobs(); } }; @@ -4555,6 +4567,14 @@ public class Catalog { return this.load; } + public LoadManager getLoadManager() { + return loadManager; + } + + public MasterTaskExecutor getLoadTaskScheduler() { + return loadTaskScheduler; + } + public RoutineLoadManager getRoutineLoadManager() { return routineLoadManager; } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 458b0544d4..e9a2fcd3fb 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -54,10 +54,6 @@ public class BrokerFileGroup implements Writable { // input private DataDescription dataDescription; - // Now we don't save this in image, only use this to parse DataDescription; - // if we have this require later, we save this here. - private BrokerDesc brokerDesc; - private long tableId; private String valueSeparator; private String lineDelimiter; @@ -83,9 +79,8 @@ public class BrokerFileGroup implements Writable { this.filePathes = table.getPaths(); } - public BrokerFileGroup(DataDescription dataDescription, BrokerDesc desc) { + public BrokerFileGroup(DataDescription dataDescription) { this.dataDescription = dataDescription; - this.brokerDesc = desc; exprColumnMap = dataDescription.getParsedExprMap(); } @@ -164,6 +159,10 @@ public class BrokerFileGroup implements Writable { return partitionIds; } + public List getPartitionNames(){ + return dataDescription.getPartitionNames(); + } + public List getFilePathes() { return filePathes; } diff --git a/fe/src/main/java/org/apache/doris/load/EtlStatus.java b/fe/src/main/java/org/apache/doris/load/EtlStatus.java index ebd176c4a1..cf5179fc4e 100644 --- a/fe/src/main/java/org/apache/doris/load/EtlStatus.java +++ b/fe/src/main/java/org/apache/doris/load/EtlStatus.java @@ -81,6 +81,10 @@ public class EtlStatus implements Writable { return counters; } + public void replaceCounter(String key, String value) { + counters.put(key, value); + } + public void setCounters(Map counters) { this.counters = counters; } @@ -93,6 +97,16 @@ public class EtlStatus implements Writable { this.fileMap = fileMap; } + public void addAllFileMap(Map fileMap) { + this.fileMap.putAll(fileMap); + } + + public void reset() { + this.stats.clear(); + this.counters.clear(); + this.fileMap.clear(); + } + @Override public String toString() { return "EtlTaskStatus [state=" + state + ", trackingUrl=" + trackingUrl + ", stats=" + stats + ", counters=" diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 91aac372d5..d72f3aebae 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -519,7 +519,7 @@ public class Load { Map>> tableToPartitionSources = Maps.newHashMap(); for (DataDescription dataDescription : dataDescriptions) { // create source - createSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag()); + checkAndCreateSource(db, dataDescription, tableToPartitionSources, job.getDeleteFlag()); job.addTableName(dataDescription.getTableName()); } for (Entry>> tableEntry : tableToPartitionSources.entrySet()) { @@ -538,7 +538,7 @@ public class Load { if (etlJobType == EtlJobType.BROKER) { PullLoadSourceInfo sourceInfo = new PullLoadSourceInfo(); for (DataDescription dataDescription : dataDescriptions) { - BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription, stmt.getBrokerDesc()); + BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); fileGroup.parse(db); sourceInfo.addFileGroup(fileGroup); } @@ -646,8 +646,10 @@ public class Load { return job; } - private void createSource(Database db, DataDescription dataDescription, - Map>> tableToPartitionSources, boolean deleteFlag) throws DdlException { + public static void checkAndCreateSource(Database db, DataDescription dataDescription, + Map>> tableToPartitionSources, + boolean deleteFlag) + throws DdlException { Source source = new Source(dataDescription.getFilePathes()); long tableId = -1; Set sourcePartitionIds = Sets.newHashSet(); @@ -846,7 +848,7 @@ public class Load { checkMini = false; } - isLabelUsed(dbId, label, -1, checkMini); + unprotectIsLabelUsed(dbId, label, -1, checkMini); // add job Map> labelToLoadJobs = null; @@ -976,7 +978,7 @@ public class Load { long dbId = db.getId(); writeLock(); try { - if (isLabelUsed(dbId, label, timestamp, true)) { + if (unprotectIsLabelUsed(dbId, label, timestamp, true)) { // label is used and this is a retry request. // no need to do further operation, just return. return false; @@ -1020,6 +1022,15 @@ public class Load { } } + public boolean isLabelUsed(long dbId, String label) throws DdlException { + readLock(); + try { + return unprotectIsLabelUsed(dbId, label, -1, false); + } finally { + readUnlock(); + } + } + /* * 1. if label is already used, and this is not a retry request, * throw exception ("Label already used") @@ -1028,7 +1039,7 @@ public class Load { * 3. if label is not used, return false * 4. throw exception if encounter error. */ - private boolean isLabelUsed(long dbId, String label, long timestamp, boolean checkMini) + private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, boolean checkMini) throws DdlException { // check dbLabelToLoadJobs if (dbLabelToLoadJobs.containsKey(dbId)) { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java new file mode 100644 index 0000000000..87968345e5 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.PullLoadSourceInfo; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +/** + * There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn. + * Step1: BrokerPendingTask will be created on method of executeJob. + * Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished. + * Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished. + */ +public class BrokerLoadJob extends LoadJob { + + private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class); + + private BrokerDesc brokerDesc; + // include broker desc and data desc + private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo(); + + // it will be set to true when pending task finished + private boolean isLoading = false; + + public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc) { + super(dbId, label); + this.timeoutSecond = Config.pull_load_task_default_timeout_second; + this.brokerDesc = brokerDesc; + } + + public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { + // get db id + String dbName = stmt.getLabel().getDbName(); + Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName()); + if (db == null) { + throw new DdlException("Database[" + dbName + "] does not exist"); + } + + // create job + BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), + stmt.getBrokerDesc()); + brokerLoadJob.setJobProperties(stmt.getProperties()); + brokerLoadJob.checkDataSourceInfo(db, stmt.getDataDescriptions()); + brokerLoadJob.setDataSourceInfo(db, stmt.getDataDescriptions()); + return brokerLoadJob; + } + + private void setDataSourceInfo(Database db, List dataDescriptions) throws DdlException { + for (DataDescription dataDescription : dataDescriptions) { + BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); + fileGroup.parse(db); + dataSourceInfo.addFileGroup(fileGroup); + } + } + + @Override + public void executeJob() { + LoadTask task = new BrokerLoadPendingTask(this, dataSourceInfo.getIdToFileGroups(), brokerDesc); + tasks.add(task); + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + } + + /** + * Situation1: When attachment is instance of BrokerPendingTaskAttachment, this method is called by broker pending task. + * LoadLoadingTask will be created after BrokerPendingTask is finished. + * Situation2: When attachment is instance of BrokerLoadingTaskAttachment, this method is called by LoadLoadingTask. + * CommitTxn will be called after all of LoadingTasks are finished. + * + * @param attachment + */ + @Override + public void onTaskFinished(TaskAttachment attachment) { + if (attachment instanceof BrokerPendingTaskAttachment) { + onPendingTaskFinished((BrokerPendingTaskAttachment) attachment); + } else if (attachment instanceof BrokerLoadingTaskAttachment) { + onLoadingTaskFinished((BrokerLoadingTaskAttachment) attachment); + } + } + + @Override + public void onTaskFailed(String errMsg) { + cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, errMsg); + } + + /** + * step1: divide job into loading task + * step2: init the plan of task + * step3: submit tasks into loadingTaskExecutor + * @param attachment BrokerPendingTaskAttachment + */ + private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) { + writeLock(); + try { + // check if job has been cancelled + if (isFinished()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("error_msg", "this task will be ignored when job is finished") + .build()); + return; + } + if (isLoading) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("error_msg", "this is a duplicated callback of pending task " + + "when broker already has loading task") + .build()); + return; + } + isLoading = true; + } finally { + writeUnlock(); + } + + Database db = null; + try { + db = getDb(); + createLoadingTask(db, attachment); + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "Failed to divide job into loading task.") + .build(), e); + cancelJobWithoutCheck(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()); + return; + } + + loadStartTimestamp = System.currentTimeMillis(); + } + + private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException { + // divide job into broker loading task by table + db.readLock(); + try { + for (Map.Entry> entry : + dataSourceInfo.getIdToFileGroups().entrySet()) { + long tableId = entry.getKey(); + OlapTable table = (OlapTable) db.getTable(tableId); + if (table == null) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("table_id", tableId) + .add("error_msg", "Failed to divide job into loading task when table not found") + .build()); + throw new MetaNotFoundException("Failed to divide job into loading task when table " + + tableId + " not found"); + } + + // Generate loading task and init the plan of task + LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, + entry.getValue(), getDeadlineMs(), execMemLimit, + transactionId, this); + task.init(attachment.getFileStatusByTable(tableId), + attachment.getFileNumByTable(tableId)); + // Add tasks into list and pool + tasks.add(task); + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task); + } + } finally { + db.readUnlock(); + } + } + + private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { + writeLock(); + try { + // check if job has been cancelled + if (isFinished()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("error_msg", "this task will be ignored when job is finished") + .build()); + return; + } + + // update loading status + updateLoadingStatus(attachment); + + // begin commit txn when all of loading tasks have been finished + if (!(tasks.size() == tasks.stream() + .filter(entity -> entity.isFinished()).count())) { + return; + } + + // check data quality + if (!checkDataQuality()) { + executeCancel(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, QUALITY_FAIL_MSG); + return; + } + } finally { + writeUnlock(); + } + + try { + Catalog.getCurrentGlobalTransactionMgr().commitTransaction( + dbId, transactionId, commitInfos); + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "Failed to commit txn.") + .build(), e); + cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()); + return; + } + } + + private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) { + loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, + increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL))); + loadingStatus.replaceCounter(DPP_NORMAL_ALL, + increaseCounter(DPP_NORMAL_ALL, attachment.getCounter(DPP_NORMAL_ALL))); + if (attachment.getTrackingUrl() != null) { + loadingStatus.setTrackingUrl(attachment.getTrackingUrl()); + } + commitInfos.addAll(attachment.getCommitInfoList()); + int finishedTaskNum = (int) tasks.stream().filter(entity -> entity.isFinished()).count(); + progress = finishedTaskNum / tasks.size() * 100; + if (progress == 100) { + progress = 99; + } + } + + private String increaseCounter(String key, String deltaValue) { + long value = Long.valueOf(loadingStatus.getCounters().get(key)); + if (deltaValue != null) { + value += Long.valueOf(deltaValue); + } + return String.valueOf(value); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java new file mode 100644 index 0000000000..e178cd34a8 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.thrift.TBrokerFileStatus; + +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public class BrokerLoadPendingTask extends LoadTask { + + private static final Logger LOG = LogManager.getLogger(BrokerLoadPendingTask.class); + + private Map> tableToBrokerFileList; + private BrokerDesc brokerDesc; + + public BrokerLoadPendingTask(LoadTaskCallback loadTaskCallback, + Map> tableToBrokerFileList, + BrokerDesc brokerDesc) { + super(loadTaskCallback); + this.attachment = new BrokerPendingTaskAttachment(); + this.tableToBrokerFileList = tableToBrokerFileList; + this.brokerDesc = brokerDesc; + } + + @Override + void executeTask() throws UserException { + getAllFileStatus(); + } + + private void getAllFileStatus() + throws UserException { + for (Map.Entry> entry : tableToBrokerFileList.entrySet()) { + long tableId = entry.getKey(); + + List> fileStatusList = Lists.newArrayList(); + List fileGroups = entry.getValue(); + for (BrokerFileGroup fileGroup : fileGroups) { + List fileStatuses = Lists.newArrayList(); + for (String path : fileGroup.getFilePathes()) { + BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses); + } + fileStatusList.add(fileStatuses); + if (LOG.isDebugEnabled()) { + for (TBrokerFileStatus fstatus : fileStatuses) { + LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("file_status", fstatus) + .build()); + } + } + } + + ((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java new file mode 100644 index 0000000000..052d1e1995 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadingTaskAttachment.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.transaction.TabletCommitInfo; + +import java.util.List; +import java.util.Map; + +public class BrokerLoadingTaskAttachment implements TaskAttachment{ + + private Map counters; + private String trackingUrl; + private List commitInfoList; + + public BrokerLoadingTaskAttachment(Map counters, String trackingUrl, + List commitInfoList) { + this.trackingUrl = trackingUrl; + this.counters = counters; + this.commitInfoList = commitInfoList; + } + + public String getCounter(String key) { + return counters.get(key); + } + + public String getTrackingUrl() { + return trackingUrl; + } + + public List getCommitInfoList() { + return commitInfoList; + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java new file mode 100644 index 0000000000..55956fa67a --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerPendingTaskAttachment.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.thrift.TBrokerFileStatus; + +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +public class BrokerPendingTaskAttachment implements TaskAttachment { + + // table id -> file status + private Map>> fileStatusMap = Maps.newHashMap(); + // table id -> total file num + private Map fileNumMap = Maps.newHashMap(); + + public void addFileStatus(long tableId, List> fileStatusList) { + fileStatusMap.put(tableId, fileStatusList); + fileNumMap.put(tableId, fileStatusList.stream().mapToInt(entity -> entity.size()).sum()); + } + + public List> getFileStatusByTable(long tableId) { + return fileStatusMap.get(tableId); + } + + public int getFileNumByTable(long tableId) { + return fileNumMap.get(tableId); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java new file mode 100644 index 0000000000..fd73af4ba3 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +public enum JobState { + PENDING, + LOADING, + FINISHED, + CANCELLED +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java new file mode 100644 index 0000000000..0351798f6b --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.EtlStatus; +import org.apache.doris.load.FailMsg; +import org.apache.doris.load.Load; +import org.apache.doris.load.Source; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TEtlState; +import org.apache.doris.transaction.AbstractTxnStateChangeCallback; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionException; +import org.apache.doris.transaction.TransactionState; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public abstract class LoadJob extends AbstractTxnStateChangeCallback implements LoadTaskCallback { + + private static final Logger LOG = LogManager.getLogger(LoadJob.class); + + protected static final String QUALITY_FAIL_MSG = "quality not good enough to cancel"; + protected static final String DPP_NORMAL_ALL = "dpp.norm.ALL"; + protected static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL"; + + protected long id = Catalog.getCurrentCatalog().getNextId(); + protected long dbId; + protected String label; + protected JobState state = JobState.PENDING; + + // optional properties + // timeout second need to be reset in constructor of subclass + protected int timeoutSecond = Config.pull_load_task_default_timeout_second; + protected long execMemLimit = 2147483648L; // 2GB; + protected double maxFilterRatio = 0; + protected boolean deleteFlag = false; + + protected long createTimestamp = System.currentTimeMillis(); + protected long loadStartTimestamp = -1; + protected long finishTimestamp = -1; + + protected long transactionId; + protected FailMsg failMsg; + protected List tasks = Lists.newArrayList(); + protected EtlStatus loadingStatus = new EtlStatus(); + // 0: the job status is pending + // n/100: n is the number of task which has been finished + // 99: all of tasks have been finished + // 100: txn status is visible and load has been finished + protected int progress; + protected List commitInfos = Lists.newArrayList(); + + // non-persistence + protected boolean isCommitting = false; + + protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + public LoadJob(long dbId, String label) { + this.dbId = dbId; + this.label = label; + } + + protected void readLock() { + lock.readLock().lock(); + } + + protected void readUnlock() { + lock.readLock().unlock(); + } + + protected void writeLock() { + lock.writeLock().lock(); + } + + protected void writeUnlock() { + lock.writeLock().unlock(); + } + + public long getId() { + return id; + } + + public Database getDb() throws MetaNotFoundException { + // get db + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + throw new MetaNotFoundException("Database " + dbId + " already has been deleted"); + } + return db; + } + + public long getDbId() { + return dbId; + } + + public String getLabel() { + return label; + } + + public JobState getState() { + return state; + } + + public long getDeadlineMs() { + return createTimestamp + timeoutSecond * 1000; + } + + public long getLeftTimeMs() { + return getDeadlineMs() - System.currentTimeMillis(); + } + + public long getFinishTimestamp() { + return finishTimestamp; + } + + public boolean isFinished() { + readLock(); + try { + return state == JobState.FINISHED || state == JobState.CANCELLED; + } finally { + readUnlock(); + } + } + + protected void setJobProperties(Map properties) throws DdlException { + // resource info + if (ConnectContext.get() != null) { + execMemLimit = ConnectContext.get().getSessionVariable().getMaxExecMemByte(); + } + + // job properties + if (properties != null) { + if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { + try { + timeoutSecond = Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + } catch (NumberFormatException e) { + throw new DdlException("Timeout is not INT", e); + } + } + + if (properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) { + try { + maxFilterRatio = Double.parseDouble(properties.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY)); + } catch (NumberFormatException e) { + throw new DdlException("Max filter ratio is not DOUBLE", e); + } + } + + if (properties.containsKey(LoadStmt.LOAD_DELETE_FLAG_PROPERTY)) { + String flag = properties.get(LoadStmt.LOAD_DELETE_FLAG_PROPERTY); + if (flag.equalsIgnoreCase("true") || flag.equalsIgnoreCase("false")) { + deleteFlag = Boolean.parseBoolean(flag); + } else { + throw new DdlException("Value of delete flag is invalid"); + } + } + + if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) { + try { + execMemLimit = Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)); + } catch (NumberFormatException e) { + throw new DdlException("Execute memory limit is not Long", e); + } + } + } + } + + protected void checkDataSourceInfo(Database db, List dataDescriptions) throws DdlException { + for (DataDescription dataDescription : dataDescriptions) { + // loadInfo is a temporary param for the method of checkAndCreateSource. + // >> + Map>> loadInfo = Maps.newHashMap(); + // only support broker load now + Load.checkAndCreateSource(db, dataDescription, loadInfo, false); + } + } + + public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + // register txn state listener + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); + transactionId = Catalog.getCurrentGlobalTransactionMgr() + .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), + TransactionState.LoadJobSourceType.FRONTEND, id, + timeoutSecond - 1); + } + + /** + * create pending task for load job and add pending task into pool + * if job has been cancelled, this step will be ignored + * @throws LabelAlreadyUsedException the job is duplicated + * @throws BeginTransactionException the limit of load job is exceeded + * @throws AnalysisException there are error params in job + */ + public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException { + writeLock(); + try { + // check if job state is pending + if (state != JobState.PENDING) { + return; + } + // the limit of job will be restrict when begin txn + beginTxn(); + executeJob(); + unprotectedUpdateState(JobState.LOADING); + } finally { + writeUnlock(); + } + } + + + public void processTimeout() { + writeLock(); + try { + if (isFinished() || getDeadlineMs() >= System.currentTimeMillis() || isCommitting) { + return; + } + executeCancel(FailMsg.CancelType.TIMEOUT, "loading timeout to cancel"); + } finally { + writeUnlock(); + } + } + + abstract void executeJob(); + + public void updateState(JobState jobState) { + writeLock(); + try { + unprotectedUpdateState(jobState); + } finally { + writeUnlock(); + } + // TODO(ML): edit log + } + + protected void unprotectedUpdateState(JobState jobState) { + switch (jobState) { + case LOADING: + executeLoad(); + break; + case FINISHED: + executeFinish(); + default: + break; + } + } + + private void executeLoad() { + loadStartTimestamp = System.currentTimeMillis(); + state = JobState.LOADING; + } + + public void cancelJobWithoutCheck(FailMsg.CancelType cancelType, String errMsg) { + writeLock(); + try { + executeCancel(cancelType, errMsg); + } finally { + writeUnlock(); + } + } + + public void cancelJob(FailMsg.CancelType cancelType, String errMsg) throws DdlException { + writeLock(); + try { + if (isCommitting) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("error_msg", "The txn which belongs to job is committing. " + + "The job could not be cancelled in this step").build()); + throw new DdlException("Job could not be cancelled while txn is committing"); + } + executeCancel(cancelType, errMsg); + } finally { + writeUnlock(); + } + } + + protected void executeCancel(FailMsg.CancelType cancelType, String errMsg) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("error_msg", "Failed to execute load with error " + errMsg) + .build()); + + // reset txn id + if (transactionId != -1) { + transactionId = -1; + } + + // clean the loadingStatus + loadingStatus.reset(); + loadingStatus.setState(TEtlState.CANCELLED); + + // tasks will not be removed from task pool. + // it will be aborted on the stage of onTaskFinished or onTaskFailed. + tasks.clear(); + + // set failMsg and state + failMsg = new FailMsg(cancelType, errMsg); + finishTimestamp = System.currentTimeMillis(); + state = JobState.CANCELLED; + + // remove callback + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); + } + + private void executeFinish() { + progress = 100; + finishTimestamp = System.currentTimeMillis(); + state = JobState.FINISHED; + Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().removeCallback(id); + + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } + + protected boolean checkDataQuality() { + Map counters = loadingStatus.getCounters(); + if (!counters.containsKey(DPP_NORMAL_ALL) || !counters.containsKey(DPP_ABNORMAL_ALL)) { + return true; + } + + long normalNum = Long.parseLong(counters.get(DPP_NORMAL_ALL)); + long abnormalNum = Long.parseLong(counters.get(DPP_ABNORMAL_ALL)); + if (abnormalNum > (abnormalNum + normalNum) * maxFilterRatio) { + return false; + } + + return true; + } + + @Override + public long getCallbackId() { + return id; + } + + @Override + public void beforeCommitted(TransactionState txnState) throws TransactionException { + writeLock(); + try { + if (transactionId == -1) { + throw new TransactionException("txn could not be committed when job has been cancelled"); + } + isCommitting = true; + } finally { + writeUnlock(); + } + } + + @Override + public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException { + if (txnOperated) { + return; + } + writeLock(); + try { + isCommitting = false; + } finally { + writeUnlock(); + } + } + + @Override + public void replayOnCommitted(TransactionState txnState) { + writeLock(); + try { + isCommitting = true; + } finally { + writeUnlock(); + } + } + + @Override + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) + throws UserException { + if (!txnOperated) { + return; + } + writeLock(); + try { + if (transactionId == -1) { + return; + } + // cancel load job + executeCancel(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason); + } finally { + writeUnlock(); + } + } + + @Override + public void replayOnAborted(TransactionState txnState) { + cancelJobWithoutCheck(FailMsg.CancelType.LOAD_RUN_FAIL, null); + } + + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + updateState(JobState.FINISHED); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java new file mode 100644 index 0000000000..9d72b8aa66 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.util.Daemon; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.FailMsg; +import org.apache.doris.transaction.BeginTransactionException; + +import com.google.common.collect.Queues; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.LinkedBlockingQueue; + +/** + * LoadScheduler will schedule the pending LoadJob which belongs to LoadManager. + * The function of execute will be called in LoadScheduler. + * The status of LoadJob will be changed to loading after LoadScheduler. + */ +public class LoadJobScheduler extends Daemon { + + private static final Logger LOG = LogManager.getLogger(LoadJobScheduler.class); + private LinkedBlockingQueue needScheduleJobs = Queues.newLinkedBlockingQueue(); + + public LoadJobScheduler() { + super("Load job scheduler", Config.load_checker_interval_second * 1000); + } + + @Override + protected void runOneCycle() { + try { + process(); + } catch (Throwable e) { + LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e); + } + } + + private void process() throws InterruptedException { + while (true) { + // take one load job from queue + LoadJob loadJob = needScheduleJobs.poll(); + if (loadJob == null) { + return; + } + + // schedule job + try { + loadJob.execute(); + } catch (LabelAlreadyUsedException | AnalysisException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) + .add("error_msg", "There are error properties in job. Job will be cancelled") + .build(), e); + loadJob.cancelJobWithoutCheck(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()); + continue; + } catch (BeginTransactionException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) + .add("error_msg", "Failed to begin txn when job is scheduling. " + + "Job will be rescheduled later") + .build(), e); + needScheduleJobs.put(loadJob); + return; + } + } + } + + public void submitJob(LoadJob job) { + needScheduleJobs.add(job); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java new file mode 100644 index 0000000000..b4d24f77d6 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.qe.Coordinator; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.task.MasterTask; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.TabletCommitInfo; + +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class LoadLoadingTask extends LoadTask { + private static final Logger LOG = LogManager.getLogger(LoadLoadingTask.class); + + private final Database db; + private final OlapTable table; + private final BrokerDesc brokerDesc; + private final List fileGroups; + private final long jobDeadlineMs; + private final long execMemLimit; + private final long txnId; + + private LoadingTaskPlanner planner; + + private String errMsg; + + public LoadLoadingTask(Database db, OlapTable table, + BrokerDesc brokerDesc, List fileGroups, + long jobDeadlineMs, long execMemLimit, long txnId, LoadTaskCallback callback) { + super(callback); + this.db = db; + this.table = table; + this.brokerDesc = brokerDesc; + this.fileGroups = fileGroups; + this.jobDeadlineMs = jobDeadlineMs; + this.execMemLimit = execMemLimit; + this.txnId = txnId; + } + + public void init(List> fileStatusList, int fileNum) throws UserException { + planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups); + planner.plan(fileStatusList, fileNum); + } + + @Override + protected void executeTask() throws UserException { + int retryTime = 3; + for (int i = 0; i < retryTime; ++i) { + isFinished = executeOnce(); + if (isFinished) { + return; + } + } + throw new UserException(errMsg); + } + + private boolean executeOnce() { + // New one query id, + UUID uuid = UUID.randomUUID(); + TUniqueId executeId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + Coordinator curCoordinator = new Coordinator(executeId, planner.getDescTable(), + planner.getFragments(), planner.getScanNodes(), db.getClusterName()); + curCoordinator.setQueryType(TQueryType.LOAD); + curCoordinator.setExecMemoryLimit(execMemLimit); + curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); + + try { + QeProcessorImpl.INSTANCE + .registerQuery(executeId, curCoordinator); + return actualExecute(curCoordinator); + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("error_msg", "failed to execute loading task") + .build(), e); + errMsg = e.getMessage(); + return false; + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(executeId); + } + } + + private boolean actualExecute(Coordinator curCoordinator) { + int waitSecond = (int) (getLeftTimeMs() / 1000); + if (waitSecond <= 0) { + errMsg = "time out"; + return false; + } + + try { + curCoordinator.exec(); + } catch (Exception e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("error_msg", "coordinator execute failed") + .build(), e); + errMsg = "coordinator execute failed with error " + e.getMessage(); + return false; + } + if (curCoordinator.join(waitSecond)) { + Status status = curCoordinator.getExecStatus(); + if (status.ok()) { + attachment = new BrokerLoadingTaskAttachment(curCoordinator.getLoadCounters(), + curCoordinator.getTrackingUrl(), + TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos())); + return true; + } else { + errMsg = status.getErrorMsg(); + return false; + } + } else { + errMsg = "coordinator could not finished before job timeout"; + return false; + } + } + + private long getLeftTimeMs() { + return jobDeadlineMs - System.currentTimeMillis(); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java new file mode 100644 index 0000000000..d421cf564d --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.task.MasterTaskExecutor; +import org.apache.doris.transaction.TransactionState; + +import com.google.common.collect.Maps; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +/** + * The broker and mini load jobs(v2) are included in this class. + */ +public class LoadManager { + private Map idToLoadJob = Maps.newConcurrentMap(); + private Map>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap(); + private LoadJobScheduler loadJobScheduler; + + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + public LoadManager(LoadJobScheduler loadJobScheduler) { + this.loadJobScheduler = loadJobScheduler; + } + + /** + * This method will be invoked by the broker load(v2) now. + * @param stmt + * @throws DdlException + */ + public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { + Database database = checkDb(stmt.getLabel().getDbName()); + long dbId = database.getId(); + writeLock(); + try { + isLabelUsed(dbId, stmt.getLabel().getLabelName()); + if (stmt.getBrokerDesc() == null) { + throw new DdlException("LoadManager only support the broker load."); + } + BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(stmt); + addLoadJob(brokerLoadJob); + } finally { + writeUnlock(); + } + } + + private void addLoadJob(LoadJob loadJob) { + idToLoadJob.put(loadJob.getId(), loadJob); + long dbId = loadJob.getDbId(); + if (!dbIdToLabelToLoadJobs.containsKey(dbId)) { + dbIdToLabelToLoadJobs.put(loadJob.getDbId(), new ConcurrentHashMap<>()); + } + if (!dbIdToLabelToLoadJobs.get(dbId).containsKey(loadJob.getLabel())) { + dbIdToLabelToLoadJobs.get(dbId).put(loadJob.getLabel(), new ArrayList<>()); + } + dbIdToLabelToLoadJobs.get(dbId).get(loadJob.getLabel()).add(loadJob); + + // submit it + loadJobScheduler.submitJob(loadJob); + } + + public List getLoadJobByState(JobState jobState) { + return idToLoadJob.values().stream() + .filter(entity -> entity.getState() == jobState) + .collect(Collectors.toList()); + } + + public void removeOldLoadJob() { + long currentTimeMs = System.currentTimeMillis(); + + writeLock(); + try { + Iterator> iter = idToLoadJob.entrySet().iterator(); + while (iter.hasNext()) { + LoadJob job = iter.next().getValue(); + if (job.isFinished() + && ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) { + iter.remove(); + dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job); + } + } + } finally { + writeUnlock(); + } + } + + public void processTimeoutJobs() { + idToLoadJob.values().stream().forEach(entity -> entity.processTimeout()); + } + + private Database checkDb(String dbName) throws DdlException { + // get db + Database db = Catalog.getInstance().getDb(dbName); + if (db == null) { + throw new DdlException("Database[" + dbName + "] does not exist"); + } + return db; + } + + /** + * step1: if label has been used in old load jobs which belong to load class + * step2: if label has been used in v2 load jobs + * + * @param dbId + * @param label + * @throws DdlException throw exception when label has been used by an unfinished job. + */ + private void isLabelUsed(long dbId, String label) + throws DdlException { + // if label has been used in old load jobs + Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label); + // if label has been used in v2 of load jobs + if (dbIdToLabelToLoadJobs.containsKey(dbId)) { + Map> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId); + if (labelToLoadJobs.containsKey(label)) { + List labelLoadJobs = labelToLoadJobs.get(label); + if (labelLoadJobs.stream().filter(entity -> !entity.isFinished()).count() != 0) { + throw new LabelAlreadyUsedException(label); + } + } + } + } + + private void readLock() { + lock.readLock().lock(); + } + + private void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java new file mode 100644 index 0000000000..1567454307 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.task.MasterTask; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +public abstract class LoadTask extends MasterTask { + + private static final Logger LOG = LogManager.getLogger(LoadTask.class); + + protected LoadTaskCallback callback; + protected TaskAttachment attachment; + protected boolean isFinished = false; + + public LoadTask(LoadTaskCallback callback) { + this.callback = callback; + } + + @Override + protected void exec() { + Exception exception = null; + try { + // execute pending task + executeTask(); + isFinished = true; + // callback on pending task finished + callback.onTaskFinished(attachment); + } catch (Exception e) { + exception = e; + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()) + .add("error_msg", "Failed to execute load task").build(), e); + } finally { + if (!isFinished) { + // callback on pending task failed + callback.onTaskFailed(exception == null ? "unknown error" : exception.getMessage()); + } + } + } + + public boolean isFinished() { + return isFinished; + } + + /** + * execute load task + * + * @throws UserException task is failed + */ + abstract void executeTask() throws UserException; +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java new file mode 100644 index 0000000000..838867dd4e --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTaskCallback.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +public interface LoadTaskCallback { + long getCallbackId(); + + void onTaskFinished(TaskAttachment attachment); + + void onTaskFailed(String errMsg); +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java new file mode 100644 index 0000000000..7ea61b88eb --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTimeoutChecker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.common.Config; +import org.apache.doris.common.util.Daemon; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * LoadTimeoutChecker is performed to cancel the timeout job. + * The job which is not finished, not cancelled, not isCommitting will be checked. + * The standard of timeout is CurrentTS > (CreateTs + timeoutSeconds * 1000). + */ +public class LoadTimeoutChecker extends Daemon { + private static final Logger LOG = LogManager.getLogger(LoadTimeoutChecker.class); + + private LoadManager loadManager; + + public LoadTimeoutChecker(LoadManager loadManager) { + super("Load job timeout checker", Config.load_checker_interval_second * 1000); + this.loadManager = loadManager; + } + + @Override + protected void runOneCycle() { + try { + loadManager.processTimeoutJobs(); + } catch (Throwable e) { + LOG.warn("Failed to process one round of LoadJobScheduler with error message {}", e.getMessage(), e); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java new file mode 100644 index 0000000000..6bf852a388 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.NotImplementedException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.planner.BrokerScanNode; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TUniqueId; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +public class LoadingTaskPlanner { + private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class); + + // Input params + private final long txnId; + private final long dbId; + private final OlapTable table; + private final BrokerDesc brokerDesc; + private final List fileGroups; + + // Something useful + private Analyzer analyzer = new Analyzer(Catalog.getInstance(), null); + private DescriptorTable descTable = analyzer.getDescTbl(); + + // Output params + private List fragments = Lists.newArrayList(); + private List scanNodes = Lists.newArrayList(); + + private int nextNodeId = 0; + + public LoadingTaskPlanner(long txnId, long dbId, OlapTable table, + BrokerDesc brokerDesc, List brokerFileGroups) { + this.txnId = txnId; + this.dbId = dbId; + this.table = table; + this.brokerDesc = brokerDesc; + this.fileGroups = brokerFileGroups; + } + + public void plan(List> fileStatusesList, int filesAdded) throws UserException { + // Generate tuple descriptor + List slotRefs = Lists.newArrayList(); + TupleDescriptor tupleDesc = descTable.createTupleDescriptor(); + for (Column col : table.getBaseSchema()) { + SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); + slotDesc.setIsMaterialized(true); + slotDesc.setColumn(col); + if (col.isAllowNull()) { + slotDesc.setIsNullable(true); + } else { + slotDesc.setIsNullable(false); + } + slotRefs.add(new SlotRef(slotDesc)); + } + + // Generate plan trees + // 1. Broker scan node + BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode", + fileStatusesList, filesAdded); + scanNode.setLoadInfo(table, brokerDesc, fileGroups); + scanNode.init(analyzer); + scanNodes.add(scanNode); + + // 2. Olap table sink + String partitionNames = convertBrokerDescPartitionInfo(); + OlapTableSink olapTableSink = new OlapTableSink(table, tupleDesc, partitionNames); + UUID uuid = UUID.randomUUID(); + TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + olapTableSink.init(loadId, txnId, dbId); + olapTableSink.finalize(); + + // 3. Plan fragment + PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM); + sinkFragment.setSink(olapTableSink); + + fragments.add(sinkFragment); + + // 4. finalize + for (PlanFragment fragment : fragments) { + try { + fragment.finalize(analyzer, false); + } catch (NotImplementedException e) { + LOG.info("Fragment finalize failed.{}", e); + throw new UserException("Fragment finalize failed."); + } + } + Collections.reverse(fragments); + } + + public DescriptorTable getDescTable() { + return descTable; + } + + public List getFragments() { + return fragments; + } + + public List getScanNodes() { + return scanNodes; + } + + private String convertBrokerDescPartitionInfo() { + String result = ""; + for (BrokerFileGroup brokerFileGroup : fileGroups) { + result += Joiner.on(",").join(brokerFileGroup.getPartitionNames()); + result += ","; + } + result = result.substring(0, result.length() - 2); + return result; + } +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java new file mode 100644 index 0000000000..14363fb72e --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/TaskAttachment.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.loadv2; + +public interface TaskAttachment { +} diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index cc54576440..518f8b1a1f 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -46,6 +46,7 @@ import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; @@ -83,7 +84,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public abstract class RoutineLoadJob implements TxnStateChangeCallback, Writable { +public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback implements Writable { private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); public static final long DEFAULT_MAX_ERROR_NUM = 0; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java index 06798be403..1aaf19a9d8 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java @@ -46,7 +46,7 @@ public class RoutineLoadScheduler extends Daemon { } public RoutineLoadScheduler(RoutineLoadManager routineLoadManager) { - super("Routine load", DEFAULT_INTERVAL_SECONDS * 1000); + super("Routine load scheduler", DEFAULT_INTERVAL_SECONDS * 1000); this.routineLoadManager = routineLoadManager; } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 79343aaafb..a1bbd88e5a 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -69,12 +69,12 @@ public class RoutineLoadTaskScheduler extends Daemon { @VisibleForTesting public RoutineLoadTaskScheduler() { - super("routine load task", 0); + super("Routine load task scheduler", 0); this.routineLoadManager = Catalog.getInstance().getRoutineLoadManager(); } public RoutineLoadTaskScheduler(RoutineLoadManager routineLoadManager) { - super("routine load task", 0); + super("Routine load task scheduler", 0); this.routineLoadManager = routineLoadManager; } diff --git a/fe/src/main/java/org/apache/doris/transaction/AbstractTxnStateChangeCallback.java b/fe/src/main/java/org/apache/doris/transaction/AbstractTxnStateChangeCallback.java new file mode 100644 index 0000000000..f1ab03eaf9 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/transaction/AbstractTxnStateChangeCallback.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.transaction; + +import org.apache.doris.common.UserException; + +public abstract class AbstractTxnStateChangeCallback implements TxnStateChangeCallback { + @Override + public void beforeCommitted(TransactionState txnState) throws TransactionException { + + } + + @Override + public void beforeAborted(TransactionState txnState) throws TransactionException { + + } + + @Override + public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException { + + } + + @Override + public void replayOnCommitted(TransactionState txnState) { + + } + + @Override + public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException { + + } + + @Override + public void replayOnAborted(TransactionState txnState) { + + } + + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + + } +} diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index dec7e05cc3..12011a3dc5 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -566,7 +566,7 @@ public class GlobalTransactionMgr { * @param errorReplicaIds * @return */ - public void finishTransaction(long transactionId, Set errorReplicaIds) { + public void finishTransaction(long transactionId, Set errorReplicaIds) throws UserException { TransactionState transactionState = idToTransactionState.get(transactionId); // add all commit errors and publish errors to a single set if (errorReplicaIds == null) { @@ -699,14 +699,17 @@ public class GlobalTransactionMgr { if (hasError) { return; } + boolean txnOperated = false; writeLock(); try { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); unprotectUpsertTransactionState(transactionState); + txnOperated = true; } finally { writeUnlock(); + transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); } updateCatalogAfterVisible(transactionState, db); } finally { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index eb22dfbb3c..0c04143117 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -326,6 +326,9 @@ public class TransactionState implements Writable { case COMMITTED: callback.afterCommitted(this, txnOperated); break; + case VISIBLE: + callback.afterVisible(this, txnOperated); + break; default: break; } diff --git a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeCallback.java b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeCallback.java index d0a4ebb8cd..ca02c37ef6 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeCallback.java +++ b/fe/src/main/java/org/apache/doris/transaction/TxnStateChangeCallback.java @@ -21,7 +21,7 @@ import org.apache.doris.common.UserException; public interface TxnStateChangeCallback { - public long getId(); + long getId(); /** * this interface is executed before txn committed, it will check if txn could be commit @@ -29,7 +29,7 @@ public interface TxnStateChangeCallback { * @throws TransactionException if transaction could not be commit or there are some exception before committed, * it will throw this exception. The txn will be committed failed. */ - public void beforeCommitted(TransactionState txnState) throws TransactionException; + void beforeCommitted(TransactionState txnState) throws TransactionException; /** * this interface is executed before txn aborted, it will check if txn could be abort @@ -38,16 +38,16 @@ public interface TxnStateChangeCallback { * @throws TransactionException if transaction could not be abort or there are some exception before aborted, * it will throw this exception. The txn will be aborted failed. */ - public void beforeAborted(TransactionState txnState) throws TransactionException; + void beforeAborted(TransactionState txnState) throws TransactionException; /** * update catalog of job which has related txn after transaction has been committed * * @param txnState */ - public void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException; + void afterCommitted(TransactionState txnState, boolean txnOperated) throws UserException; - public void replayOnCommitted(TransactionState txnState); + void replayOnCommitted(TransactionState txnState); /** * this interface is executed when transaction has been aborted @@ -57,7 +57,9 @@ public interface TxnStateChangeCallback { * maybe null * @return */ - public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException; + void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException; - public void replayOnAborted(TransactionState txnState); + void replayOnAborted(TransactionState txnState); + + void afterVisible(TransactionState txnState, boolean txnOperated); }