[Feat](Job) Job supports task execution statistics (#34109)
* Support statistics * - Fix Failed task not showing up in the task list - Task metadata add jobName - Fix Finished job clear time error - Job metadata add successCount, failedCount, totalTaskCount * add test
This commit is contained in:
@ -51,6 +51,7 @@ import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
@ -58,7 +59,11 @@ import java.util.stream.Collectors;
|
||||
@Data
|
||||
@Log4j2
|
||||
public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
|
||||
|
||||
public static final ImmutableList<Column> COMMON_SCHEMA = ImmutableList.of(
|
||||
new Column("SucceedTaskCount", ScalarType.createStringType()),
|
||||
new Column("FailedTaskCount", ScalarType.createStringType()),
|
||||
new Column("CanceledTaskCount", ScalarType.createStringType())
|
||||
);
|
||||
@SerializedName(value = "jid")
|
||||
private Long jobId;
|
||||
|
||||
@ -92,6 +97,16 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
@SerializedName(value = "sql")
|
||||
String executeSql;
|
||||
|
||||
|
||||
@SerializedName(value = "stc")
|
||||
private AtomicLong succeedTaskCount = new AtomicLong(0);
|
||||
|
||||
@SerializedName(value = "ftc")
|
||||
private AtomicLong failedTaskCount = new AtomicLong(0);
|
||||
|
||||
@SerializedName(value = "ctc")
|
||||
private AtomicLong canceledTaskCount = new AtomicLong(0);
|
||||
|
||||
public AbstractJob() {
|
||||
}
|
||||
|
||||
@ -142,6 +157,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
task.cancel();
|
||||
}
|
||||
runningTasks = new CopyOnWriteArrayList<>();
|
||||
logUpdateOperation();
|
||||
}
|
||||
|
||||
private static final ImmutableList<String> TITLE_NAMES =
|
||||
@ -290,14 +306,18 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
|
||||
@Override
|
||||
public void onTaskFail(T task) throws JobException {
|
||||
failedTaskCount.incrementAndGet();
|
||||
updateJobStatusIfEnd(false);
|
||||
runningTasks.remove(task);
|
||||
logUpdateOperation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTaskSuccess(T task) throws JobException {
|
||||
succeedTaskCount.incrementAndGet();
|
||||
updateJobStatusIfEnd(true);
|
||||
runningTasks.remove(task);
|
||||
logUpdateOperation();
|
||||
|
||||
}
|
||||
|
||||
@ -309,12 +329,15 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
}
|
||||
switch (executeType) {
|
||||
case ONE_TIME:
|
||||
updateJobStatus(JobStatus.FINISHED);
|
||||
this.finishTimeMs = System.currentTimeMillis();
|
||||
break;
|
||||
case INSTANT:
|
||||
this.finishTimeMs = System.currentTimeMillis();
|
||||
if (taskSuccess) {
|
||||
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
|
||||
updateJobStatus(JobStatus.FINISHED);
|
||||
} else {
|
||||
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.STOPPED);
|
||||
updateJobStatus(JobStatus.STOPPED);
|
||||
}
|
||||
break;
|
||||
case RECURRING:
|
||||
@ -322,7 +345,8 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
if (null != timerDefinition.getEndTimeMs()
|
||||
&& timerDefinition.getEndTimeMs() < System.currentTimeMillis()
|
||||
+ timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval())) {
|
||||
Env.getCurrentEnv().getJobManager().getJob(jobId).updateJobStatus(JobStatus.FINISHED);
|
||||
this.finishTimeMs = System.currentTimeMillis();
|
||||
updateJobStatus(JobStatus.FINISHED);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -360,6 +384,9 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobStatus.name()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(executeSql));
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(createTimeMs)));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(succeedTaskCount.get())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(failedTaskCount.get())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(canceledTaskCount.get())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(comment));
|
||||
return trow;
|
||||
}
|
||||
|
||||
@ -30,7 +30,6 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
@ -67,38 +66,39 @@ import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@Data
|
||||
@Slf4j
|
||||
@Log4j2
|
||||
public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> implements GsonPostProcessable {
|
||||
|
||||
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("Id", ScalarType.createStringType()),
|
||||
new Column("Name", ScalarType.createStringType()),
|
||||
new Column("Definer", ScalarType.createStringType()),
|
||||
new Column("ExecuteType", ScalarType.createStringType()),
|
||||
new Column("RecurringStrategy", ScalarType.createStringType()),
|
||||
new Column("Status", ScalarType.createStringType()),
|
||||
new Column("ExecuteSql", ScalarType.createStringType()),
|
||||
new Column("CreateTime", ScalarType.createStringType()),
|
||||
new Column("Comment", ScalarType.createStringType()));
|
||||
public static final ImmutableList<Column> SCHEMA = ImmutableList.<Column>builder()
|
||||
.add(new Column("Id", ScalarType.createStringType()))
|
||||
.add(new Column("Name", ScalarType.createStringType()))
|
||||
.add(new Column("Definer", ScalarType.createStringType()))
|
||||
.add(new Column("ExecuteType", ScalarType.createStringType()))
|
||||
.add(new Column("RecurringStrategy", ScalarType.createStringType()))
|
||||
.add(new Column("Status", ScalarType.createStringType()))
|
||||
.add(new Column("ExecuteSql", ScalarType.createStringType()))
|
||||
.add(new Column("CreateTime", ScalarType.createStringType()))
|
||||
.addAll(COMMON_SCHEMA)
|
||||
.add(new Column("Comment", ScalarType.createStringType()))
|
||||
.build();
|
||||
|
||||
private static final ShowResultSetMetaData TASK_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
@ -126,8 +126,10 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
COLUMN_TO_INDEX = builder.build();
|
||||
}
|
||||
|
||||
//we used insertTaskQueue to store the task info, and we will query the task info from it
|
||||
@Deprecated
|
||||
@SerializedName("tis")
|
||||
ConcurrentLinkedQueue<Long> historyTaskIdList;
|
||||
ConcurrentLinkedQueue<Long> historyTaskIdList = new ConcurrentLinkedQueue<>();
|
||||
@SerializedName("did")
|
||||
private final long dbId;
|
||||
@SerializedName("ln")
|
||||
@ -146,7 +148,9 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
private List<InsertIntoTableCommand> plans = new ArrayList<>();
|
||||
private LoadStatistic loadStatistic = new LoadStatistic();
|
||||
private Set<Long> finishedTaskIds = new HashSet<>();
|
||||
private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
|
||||
|
||||
@SerializedName("tas")
|
||||
private ConcurrentLinkedQueue<InsertTask> insertTaskQueue = new ConcurrentLinkedQueue<>();
|
||||
private Map<String, String> properties = new HashMap<>();
|
||||
private Set<String> tableNames;
|
||||
private AuthorizationInfo authorizationInfo;
|
||||
@ -164,8 +168,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
if (null == plans) {
|
||||
plans = new ArrayList<>();
|
||||
}
|
||||
if (null == idToTasks) {
|
||||
idToTasks = new ConcurrentHashMap<>();
|
||||
if (null == insertTaskQueue) {
|
||||
insertTaskQueue = new ConcurrentLinkedQueue<>();
|
||||
}
|
||||
if (null == loadStatistic) {
|
||||
loadStatistic = new LoadStatistic();
|
||||
@ -182,6 +186,15 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
if (null == historyTaskIdList) {
|
||||
historyTaskIdList = new ConcurrentLinkedQueue<>();
|
||||
}
|
||||
if (null == getSucceedTaskCount()) {
|
||||
setSucceedTaskCount(new AtomicLong(0));
|
||||
}
|
||||
if (null == getFailedTaskCount()) {
|
||||
setFailedTaskCount(new AtomicLong(0));
|
||||
}
|
||||
if (null == getCanceledTaskCount()) {
|
||||
setCanceledTaskCount(new AtomicLong(0));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -250,9 +263,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
List<InsertTask> newTasks = new ArrayList<>();
|
||||
if (plans.isEmpty()) {
|
||||
InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
|
||||
idToTasks.put(task.getTaskId(), task);
|
||||
newTasks.add(task);
|
||||
recordTask(task.getTaskId());
|
||||
} else {
|
||||
// use for load stmt
|
||||
for (InsertIntoTableCommand logicalPlan : plans) {
|
||||
@ -260,28 +271,24 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
throw new IllegalArgumentException("Load plan need label name.");
|
||||
}
|
||||
InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic);
|
||||
idToTasks.put(task.getTaskId(), task);
|
||||
newTasks.add(task);
|
||||
recordTask(task.getTaskId());
|
||||
}
|
||||
}
|
||||
initTasks(newTasks, taskType);
|
||||
recordTasks(newTasks);
|
||||
return new ArrayList<>(newTasks);
|
||||
}
|
||||
|
||||
public void recordTask(long id) {
|
||||
public void recordTasks(List<InsertTask> tasks) {
|
||||
if (Config.max_persistence_task_count < 1) {
|
||||
return;
|
||||
}
|
||||
if (CollectionUtils.isEmpty(historyTaskIdList)) {
|
||||
historyTaskIdList = new ConcurrentLinkedQueue<>();
|
||||
historyTaskIdList.add(id);
|
||||
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
|
||||
return;
|
||||
}
|
||||
historyTaskIdList.add(id);
|
||||
if (historyTaskIdList.size() >= Config.max_persistence_task_count) {
|
||||
historyTaskIdList.poll();
|
||||
insertTaskQueue.addAll(tasks);
|
||||
|
||||
while (insertTaskQueue.size() > Config.max_persistence_task_count) {
|
||||
insertTaskQueue.poll();
|
||||
//since we have insertTaskQueue, we do not need to store the task id in historyTaskIdList, so we clear it
|
||||
historyTaskIdList.clear();
|
||||
}
|
||||
Env.getCurrentEnv().getEditLog().logUpdateJob(this);
|
||||
}
|
||||
@ -319,35 +326,54 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
|
||||
@Override
|
||||
public List<InsertTask> queryTasks() {
|
||||
if (CollectionUtils.isEmpty(historyTaskIdList)) {
|
||||
if (historyTaskIdList.isEmpty() && insertTaskQueue.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
//TODO it's will be refactor, we will storage task info in job inner and query from it
|
||||
List<Long> taskIdList = new ArrayList<>(this.historyTaskIdList);
|
||||
|
||||
// merge task info from insertTaskQueue and historyTaskIdList
|
||||
List<Long> taskIds = insertTaskQueue.stream().map(InsertTask::getTaskId).collect(Collectors.toList());
|
||||
taskIds.addAll(historyTaskIdList);
|
||||
taskIds.stream().distinct().collect(Collectors.toList());
|
||||
if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) {
|
||||
Collections.reverse(taskIdList);
|
||||
return queryLoadTasksByTaskIds(taskIdList);
|
||||
return queryLoadTasksByTaskIds(taskIds);
|
||||
}
|
||||
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList);
|
||||
if (CollectionUtils.isEmpty(loadJobs)) {
|
||||
return new ArrayList<>();
|
||||
// query from load job
|
||||
List<LoadJob> loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds);
|
||||
|
||||
Map<Long, LoadJob> loadJobMap = loadJobs.stream().collect(Collectors.toMap(LoadJob::getId, loadJob -> loadJob));
|
||||
List<InsertTask> tasksRsp = new ArrayList<>();
|
||||
//read task info from insertTaskQueue
|
||||
insertTaskQueue.forEach(task -> {
|
||||
if (task.getJobInfo() == null) {
|
||||
LoadJob loadJob = loadJobMap.get(task.getTaskId());
|
||||
if (loadJob != null) {
|
||||
task.setJobInfo(loadJob);
|
||||
}
|
||||
}
|
||||
tasksRsp.add(task);
|
||||
});
|
||||
if (CollectionUtils.isEmpty(historyTaskIdList)) {
|
||||
return tasksRsp;
|
||||
}
|
||||
List<InsertTask> tasks = new ArrayList<>();
|
||||
loadJobs.forEach(loadJob -> {
|
||||
InsertTask task;
|
||||
try {
|
||||
task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser());
|
||||
task.setCreateTimeMs(loadJob.getCreateTimestamp());
|
||||
} catch (MetaNotFoundException e) {
|
||||
log.warn("load job not found, job id is {}", loadJob.getId());
|
||||
|
||||
historyTaskIdList.forEach(historyTaskId -> {
|
||||
LoadJob loadJob = loadJobMap.get(historyTaskId);
|
||||
if (null == loadJob) {
|
||||
return;
|
||||
}
|
||||
InsertTask task = new InsertTask(loadJob.getLabel(), getCurrentDbName(), null, getCreateUser());
|
||||
task.setJobId(getJobId());
|
||||
task.setTaskId(loadJob.getId());
|
||||
task.setJobInfo(loadJob);
|
||||
tasks.add(task);
|
||||
task.setJobId(getJobId());
|
||||
task.setTaskId(loadJob.getId());
|
||||
task.setJobInfo(loadJob);
|
||||
tasksRsp.add(task);
|
||||
});
|
||||
return tasks;
|
||||
return tasksRsp;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -355,13 +381,13 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
if (taskIdList.isEmpty()) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<InsertTask> tasks = new ArrayList<>();
|
||||
taskIdList.forEach(id -> {
|
||||
if (null != idToTasks.get(id)) {
|
||||
tasks.add(idToTasks.get(id));
|
||||
List<InsertTask> queryTasks = new ArrayList<>();
|
||||
insertTaskQueue.forEach(task -> {
|
||||
if (taskIdList.contains(task.getTaskId())) {
|
||||
queryTasks.add(task);
|
||||
}
|
||||
});
|
||||
return tasks;
|
||||
return queryTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -462,7 +488,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
// load end time
|
||||
jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
|
||||
// tracking urls
|
||||
List<String> trackingUrl = idToTasks.values().stream()
|
||||
List<String> trackingUrl = insertTaskQueue.stream()
|
||||
.map(task -> {
|
||||
if (StringUtils.isNotEmpty(task.getTrackingUrl())) {
|
||||
return task.getTrackingUrl();
|
||||
@ -527,7 +553,7 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
|
||||
public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
|
||||
long scannedBytes, boolean isDone) {
|
||||
loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
|
||||
progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100);
|
||||
progress = (int) ((double) finishedTaskIds.size() / insertTaskQueue.size() * 100);
|
||||
if (progress == 100) {
|
||||
progress = 99;
|
||||
}
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
@ -52,11 +53,12 @@ public class InsertTask extends AbstractTask {
|
||||
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("TaskId", ScalarType.createStringType()),
|
||||
new Column("JobId", ScalarType.createStringType()),
|
||||
new Column("JobName", ScalarType.createStringType()),
|
||||
new Column("Label", ScalarType.createStringType()),
|
||||
new Column("Status", ScalarType.createStringType()),
|
||||
new Column("ErrorMsg", ScalarType.createStringType()),
|
||||
new Column("CreateTimeMs", ScalarType.createStringType()),
|
||||
new Column("FinishTimeMs", ScalarType.createStringType()),
|
||||
new Column("CreateTime", ScalarType.createStringType()),
|
||||
new Column("FinishTime", ScalarType.createStringType()),
|
||||
new Column("TrackingUrl", ScalarType.createStringType()),
|
||||
new Column("LoadStatistic", ScalarType.createStringType()),
|
||||
new Column("User", ScalarType.createStringType()));
|
||||
@ -64,7 +66,7 @@ public class InsertTask extends AbstractTask {
|
||||
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
|
||||
|
||||
static {
|
||||
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
|
||||
ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
|
||||
for (int i = 0; i < SCHEMA.size(); i++) {
|
||||
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
|
||||
}
|
||||
@ -77,6 +79,7 @@ public class InsertTask extends AbstractTask {
|
||||
private ConnectContext ctx;
|
||||
private String sql;
|
||||
private String currentDb;
|
||||
@SerializedName(value = "uif")
|
||||
private UserIdentity userIdentity;
|
||||
private LoadStatistic loadStatistic;
|
||||
private AtomicBoolean isCanceled = new AtomicBoolean(false);
|
||||
@ -211,42 +214,51 @@ public class InsertTask extends AbstractTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TRow getTvfInfo() {
|
||||
public TRow getTvfInfo(String jobName) {
|
||||
TRow trow = new TRow();
|
||||
if (jobInfo == null) {
|
||||
// if task not start, load job is null,return pending task show info
|
||||
return getPendingTaskTVFInfo();
|
||||
return getPendingTaskTVFInfo(jobName);
|
||||
}
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(labelName));
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobName));
|
||||
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name()));
|
||||
// err msg
|
||||
String errMsg = "";
|
||||
String errorMsg = "";
|
||||
if (failMsg != null) {
|
||||
errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg();
|
||||
errorMsg = failMsg.getMsg();
|
||||
}
|
||||
trow.addToColumnValue(new TCell().setStringVal(errMsg));
|
||||
if (StringUtils.isNotBlank(getErrMsg())) {
|
||||
errorMsg = getErrMsg();
|
||||
}
|
||||
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
|
||||
// create time
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
|
||||
// load end time
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
|
||||
// tracking url
|
||||
trow.addToColumnValue(new TCell().setStringVal(trackingUrl));
|
||||
if (null != loadStatistic) {
|
||||
trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson()));
|
||||
if (null != jobInfo.getLoadStatistic()) {
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobInfo.getLoadStatistic().toJson()));
|
||||
} else {
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
}
|
||||
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
|
||||
if (userIdentity == null) {
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
} else {
|
||||
trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser()));
|
||||
}
|
||||
return trow;
|
||||
}
|
||||
|
||||
// if task not start, load job is null,return pending task show info
|
||||
private TRow getPendingTaskTVFInfo() {
|
||||
private TRow getPendingTaskTVFInfo(String jobName) {
|
||||
TRow trow = new TRow();
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getTaskId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobName));
|
||||
trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
|
||||
@ -299,11 +299,11 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TRow getTvfInfo() {
|
||||
public TRow getTvfInfo(String jobName) {
|
||||
TRow trow = new TRow();
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId())));
|
||||
trow.addToColumnValue(new TCell().setStringVal(super.getJobName()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(jobName));
|
||||
String dbName = "";
|
||||
String mvName = "";
|
||||
try {
|
||||
|
||||
@ -29,8 +29,6 @@ import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.PatternMatcherWrapper;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.LogBuilder;
|
||||
import org.apache.doris.common.util.LogKey;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.job.base.AbstractJob;
|
||||
import org.apache.doris.job.common.JobStatus;
|
||||
@ -267,8 +265,7 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
|
||||
**/
|
||||
public void replayUpdateJob(T job) {
|
||||
jobMap.put(job.getJobId(), job);
|
||||
log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId())
|
||||
.add("msg", "replay update scheduler job").build());
|
||||
job.logUpdateOperation();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -196,7 +196,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable {
|
||||
}
|
||||
|
||||
private void clearEndJob(T job) {
|
||||
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) {
|
||||
if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS > System.currentTimeMillis()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
|
||||
@ -63,7 +63,7 @@ public abstract class AbstractTask implements Task {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFail(String msg) throws JobException {
|
||||
public void onFail() throws JobException {
|
||||
status = TaskStatus.FAILED;
|
||||
if (!isCallable()) {
|
||||
return;
|
||||
@ -72,12 +72,13 @@ public abstract class AbstractTask implements Task {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFail() throws JobException {
|
||||
public void onFail(String errMsg) throws JobException {
|
||||
if (TaskStatus.CANCELED.equals(status)) {
|
||||
return;
|
||||
}
|
||||
status = TaskStatus.FAILED;
|
||||
setFinishTimeMs(System.currentTimeMillis());
|
||||
setErrMsg(errMsg);
|
||||
if (!isCallable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -70,5 +70,5 @@ public interface Task {
|
||||
* get info for tvf `tasks`
|
||||
* @return TRow
|
||||
*/
|
||||
TRow getTvfInfo();
|
||||
TRow getTvfInfo(String jobName);
|
||||
}
|
||||
|
||||
@ -809,7 +809,7 @@ public class MetadataGenerator {
|
||||
}
|
||||
List<AbstractTask> tasks = job.queryAllTasks();
|
||||
for (AbstractTask task : tasks) {
|
||||
TRow tvfInfo = task.getTvfInfo();
|
||||
TRow tvfInfo = task.getTvfInfo(job.getJobName());
|
||||
if (tvfInfo != null) {
|
||||
dataBatch.add(tvfInfo);
|
||||
}
|
||||
|
||||
@ -118,13 +118,11 @@ suite("test_base_insert_job") {
|
||||
"""
|
||||
|
||||
Thread.sleep(2000)
|
||||
def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
|
||||
def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """
|
||||
assert onceJob.size() == 1
|
||||
def onceJobId = onceJob.get(0).get(0);
|
||||
def onceJobSql = onceJob.get(0).get(1);
|
||||
println onceJobSql
|
||||
// test cancel task
|
||||
def datas = sql """select status,taskid from tasks("type"="insert") where jobid= ${onceJobId}"""
|
||||
//check succeed task count
|
||||
assert '1' == onceJob.get(0).get(0)
|
||||
def datas = sql """select status,taskid from tasks("type"="insert") where jobName= '${jobName}'"""
|
||||
println datas
|
||||
assert datas.size() == 1
|
||||
assert datas.get(0).get(0) == "FINISHED"
|
||||
@ -154,7 +152,7 @@ suite("test_base_insert_job") {
|
||||
DROP JOB IF EXISTS where jobname = '${jobName}'
|
||||
"""
|
||||
sql """
|
||||
CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001);
|
||||
CREATE JOB ${jobName} ON SCHEDULE every 1 second starts current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001);
|
||||
"""
|
||||
|
||||
Thread.sleep(2000)
|
||||
@ -162,15 +160,12 @@ suite("test_base_insert_job") {
|
||||
sql """
|
||||
PAUSE JOB where jobname = '${jobName}'
|
||||
"""
|
||||
def job = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' """
|
||||
assert job.size() == 1
|
||||
def jobId = job.get(0).get(0);
|
||||
def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
|
||||
def tasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
|
||||
sql """
|
||||
RESUME JOB where jobname = '${jobName}'
|
||||
"""
|
||||
Thread.sleep(2500)
|
||||
def afterResumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """
|
||||
def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """
|
||||
println afterResumeTasks
|
||||
assert afterResumeTasks.size() >tasks.size
|
||||
// assert same job name
|
||||
|
||||
Reference in New Issue
Block a user