[Chore](Job)Add the configuration of the maximum number of persistence tasks for the job (#28411)

This commit is contained in:
Calvin Kirs
2023-12-15 11:14:06 +08:00
committed by GitHub
parent 8661b5ec21
commit c4242ab69e
3 changed files with 25 additions and 18 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.job.extensions.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.job.base.AbstractJob;
@ -85,10 +86,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
ConnectContext ctx;
@SerializedName("tis")
ConcurrentLinkedQueue<Long> taskIdList;
// max save task num, do we need to config it?
private static final int MAX_SAVE_TASK_NUM = 100;
ConcurrentLinkedQueue<Long> historyTaskIdList;
@Override
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
@ -100,21 +98,23 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
ArrayList<InsertTask> tasks = new ArrayList<>();
tasks.add(task);
super.initTasks(tasks);
addNewTask(task.getTaskId());
recordTask(task.getTaskId());
return tasks;
}
public void addNewTask(long id) {
if (CollectionUtils.isEmpty(taskIdList)) {
taskIdList = new ConcurrentLinkedQueue<>();
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
taskIdList.add(id);
public void recordTask(long id) {
if (Config.max_persistence_task_count < 1) {
return;
}
taskIdList.add(id);
if (taskIdList.size() >= MAX_SAVE_TASK_NUM) {
taskIdList.poll();
if (CollectionUtils.isEmpty(historyTaskIdList)) {
historyTaskIdList = new ConcurrentLinkedQueue<>();
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
historyTaskIdList.add(id);
return;
}
historyTaskIdList.add(id);
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
historyTaskIdList.poll();
}
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
}
@ -148,11 +148,11 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
@Override
public List<InsertTask> queryTasks() {
if (CollectionUtils.isEmpty(taskIdList)) {
if (CollectionUtils.isEmpty(historyTaskIdList)) {
return new ArrayList<>();
}
//TODO it's will be refactor, we will storage task info in job inner and query from it
List<Long> taskIdList = new ArrayList<>(this.taskIdList);
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
Collections.reverse(taskIdList);
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
if (CollectionUtils.isEmpty(loadJobs)) {

View File

@ -17,6 +17,7 @@
package org.apache.doris.mtmv;
import org.apache.doris.common.Config;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import com.google.common.collect.Lists;
@ -43,8 +44,11 @@ public class MTMVJobInfo {
}
public void addHistoryTask(MTMVTask task) {
if (Config.max_persistence_task_count < 1) {
return;
}
historyTasks.add(task);
if (historyTasks.size() > MTMVTask.MAX_HISTORY_TASKS_NUM) {
if (historyTasks.size() > Config.max_persistence_task_count) {
historyTasks.removeFirst();
}
}