From 3fd3dfe16f6c2d4289fdc91c96f3426be6a77e8f Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 30 Apr 2024 10:51:24 +0800 Subject: [PATCH] [Feat](Job) Job supports task execution statistics (#34109) * Support statistics * - Fix Failed task not showing up in the task list - Task metadata add jobName - Fix Finished job clear time error - Job metadata add successCount, failedCount, totalTaskCount * add test --- .../apache/doris/job/base/AbstractJob.java | 35 ++++- .../job/extensions/insert/InsertJob.java | 140 +++++++++++------- .../job/extensions/insert/InsertTask.java | 42 ++++-- .../doris/job/extensions/mtmv/MTMVTask.java | 4 +- .../apache/doris/job/manager/JobManager.java | 5 +- .../doris/job/scheduler/JobScheduler.java | 2 +- .../apache/doris/job/task/AbstractTask.java | 5 +- .../java/org/apache/doris/job/task/Task.java | 2 +- .../tablefunction/MetadataGenerator.java | 2 +- .../suites/job_p0/test_base_insert_job.groovy | 19 +-- 10 files changed, 157 insertions(+), 99 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 8c1d6da3a6..3f595d6daf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -51,6 +51,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -58,7 +59,11 @@ import java.util.stream.Collectors; @Data @Log4j2 public abstract class AbstractJob implements Job, Writable { - + public static final ImmutableList COMMON_SCHEMA = ImmutableList.of( + new Column("SucceedTaskCount", ScalarType.createStringType()), + new Column("FailedTaskCount", ScalarType.createStringType()), + new Column("CanceledTaskCount", ScalarType.createStringType()) + ); @SerializedName(value = "jid") private Long jobId; @@ -92,6 +97,16 @@ public abstract class AbstractJob implements Job implements Job(); + logUpdateOperation(); } private static final ImmutableList TITLE_NAMES = @@ -290,14 +306,18 @@ public abstract class AbstractJob implements Job implements Job implements Job implements Job> implements GsonPostProcessable { - public static final ImmutableList SCHEMA = ImmutableList.of( - new Column("Id", ScalarType.createStringType()), - new Column("Name", ScalarType.createStringType()), - new Column("Definer", ScalarType.createStringType()), - new Column("ExecuteType", ScalarType.createStringType()), - new Column("RecurringStrategy", ScalarType.createStringType()), - new Column("Status", ScalarType.createStringType()), - new Column("ExecuteSql", ScalarType.createStringType()), - new Column("CreateTime", ScalarType.createStringType()), - new Column("Comment", ScalarType.createStringType())); + public static final ImmutableList SCHEMA = ImmutableList.builder() + .add(new Column("Id", ScalarType.createStringType())) + .add(new Column("Name", ScalarType.createStringType())) + .add(new Column("Definer", ScalarType.createStringType())) + .add(new Column("ExecuteType", ScalarType.createStringType())) + .add(new Column("RecurringStrategy", ScalarType.createStringType())) + .add(new Column("Status", ScalarType.createStringType())) + .add(new Column("ExecuteSql", ScalarType.createStringType())) + .add(new Column("CreateTime", ScalarType.createStringType())) + .addAll(COMMON_SCHEMA) + .add(new Column("Comment", ScalarType.createStringType())) + .build(); private static final ShowResultSetMetaData TASK_META_DATA = ShowResultSetMetaData.builder() @@ -126,8 +126,10 @@ public class InsertJob extends AbstractJob> impl COLUMN_TO_INDEX = builder.build(); } + //we used insertTaskQueue to store the task info, and we will query the task info from it + @Deprecated @SerializedName("tis") - ConcurrentLinkedQueue historyTaskIdList; + ConcurrentLinkedQueue historyTaskIdList = new ConcurrentLinkedQueue<>(); @SerializedName("did") private final long dbId; @SerializedName("ln") @@ -146,7 +148,9 @@ public class InsertJob extends AbstractJob> impl private List plans = new ArrayList<>(); private LoadStatistic loadStatistic = new LoadStatistic(); private Set finishedTaskIds = new HashSet<>(); - private ConcurrentHashMap idToTasks = new ConcurrentHashMap<>(); + + @SerializedName("tas") + private ConcurrentLinkedQueue insertTaskQueue = new ConcurrentLinkedQueue<>(); private Map properties = new HashMap<>(); private Set tableNames; private AuthorizationInfo authorizationInfo; @@ -164,8 +168,8 @@ public class InsertJob extends AbstractJob> impl if (null == plans) { plans = new ArrayList<>(); } - if (null == idToTasks) { - idToTasks = new ConcurrentHashMap<>(); + if (null == insertTaskQueue) { + insertTaskQueue = new ConcurrentLinkedQueue<>(); } if (null == loadStatistic) { loadStatistic = new LoadStatistic(); @@ -182,6 +186,15 @@ public class InsertJob extends AbstractJob> impl if (null == historyTaskIdList) { historyTaskIdList = new ConcurrentLinkedQueue<>(); } + if (null == getSucceedTaskCount()) { + setSucceedTaskCount(new AtomicLong(0)); + } + if (null == getFailedTaskCount()) { + setFailedTaskCount(new AtomicLong(0)); + } + if (null == getCanceledTaskCount()) { + setCanceledTaskCount(new AtomicLong(0)); + } } /** @@ -250,9 +263,7 @@ public class InsertJob extends AbstractJob> impl List newTasks = new ArrayList<>(); if (plans.isEmpty()) { InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser()); - idToTasks.put(task.getTaskId(), task); newTasks.add(task); - recordTask(task.getTaskId()); } else { // use for load stmt for (InsertIntoTableCommand logicalPlan : plans) { @@ -260,28 +271,24 @@ public class InsertJob extends AbstractJob> impl throw new IllegalArgumentException("Load plan need label name."); } InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic); - idToTasks.put(task.getTaskId(), task); newTasks.add(task); - recordTask(task.getTaskId()); } } initTasks(newTasks, taskType); + recordTasks(newTasks); return new ArrayList<>(newTasks); } - public void recordTask(long id) { + public void recordTasks(List tasks) { if (Config.max_persistence_task_count < 1) { return; } - if (CollectionUtils.isEmpty(historyTaskIdList)) { - historyTaskIdList = new ConcurrentLinkedQueue<>(); - historyTaskIdList.add(id); - Env.getCurrentEnv().getEditLog().logUpdateJob(this); - return; - } - historyTaskIdList.add(id); - if (historyTaskIdList.size() >= Config.max_persistence_task_count) { - historyTaskIdList.poll(); + insertTaskQueue.addAll(tasks); + + while (insertTaskQueue.size() > Config.max_persistence_task_count) { + insertTaskQueue.poll(); + //since we have insertTaskQueue, we do not need to store the task id in historyTaskIdList, so we clear it + historyTaskIdList.clear(); } Env.getCurrentEnv().getEditLog().logUpdateJob(this); } @@ -319,35 +326,54 @@ public class InsertJob extends AbstractJob> impl @Override public List queryTasks() { - if (CollectionUtils.isEmpty(historyTaskIdList)) { + if (historyTaskIdList.isEmpty() && insertTaskQueue.isEmpty()) { return new ArrayList<>(); } + //TODO it's will be refactor, we will storage task info in job inner and query from it - List taskIdList = new ArrayList<>(this.historyTaskIdList); + + // merge task info from insertTaskQueue and historyTaskIdList + List taskIds = insertTaskQueue.stream().map(InsertTask::getTaskId).collect(Collectors.toList()); + taskIds.addAll(historyTaskIdList); + taskIds.stream().distinct().collect(Collectors.toList()); if (getJobConfig().getExecuteType().equals(JobExecuteType.INSTANT)) { - Collections.reverse(taskIdList); - return queryLoadTasksByTaskIds(taskIdList); + return queryLoadTasksByTaskIds(taskIds); } - List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIdList); - if (CollectionUtils.isEmpty(loadJobs)) { - return new ArrayList<>(); + // query from load job + List loadJobs = Env.getCurrentEnv().getLoadManager().queryLoadJobsByJobIds(taskIds); + + Map loadJobMap = loadJobs.stream().collect(Collectors.toMap(LoadJob::getId, loadJob -> loadJob)); + List tasksRsp = new ArrayList<>(); + //read task info from insertTaskQueue + insertTaskQueue.forEach(task -> { + if (task.getJobInfo() == null) { + LoadJob loadJob = loadJobMap.get(task.getTaskId()); + if (loadJob != null) { + task.setJobInfo(loadJob); + } + } + tasksRsp.add(task); + }); + if (CollectionUtils.isEmpty(historyTaskIdList)) { + return tasksRsp; } - List tasks = new ArrayList<>(); - loadJobs.forEach(loadJob -> { - InsertTask task; - try { - task = new InsertTask(loadJob.getLabel(), loadJob.getDb().getFullName(), null, getCreateUser()); - task.setCreateTimeMs(loadJob.getCreateTimestamp()); - } catch (MetaNotFoundException e) { - log.warn("load job not found, job id is {}", loadJob.getId()); + + historyTaskIdList.forEach(historyTaskId -> { + LoadJob loadJob = loadJobMap.get(historyTaskId); + if (null == loadJob) { return; } + InsertTask task = new InsertTask(loadJob.getLabel(), getCurrentDbName(), null, getCreateUser()); task.setJobId(getJobId()); task.setTaskId(loadJob.getId()); task.setJobInfo(loadJob); - tasks.add(task); + task.setJobId(getJobId()); + task.setTaskId(loadJob.getId()); + task.setJobInfo(loadJob); + tasksRsp.add(task); }); - return tasks; + return tasksRsp; + } @@ -355,13 +381,13 @@ public class InsertJob extends AbstractJob> impl if (taskIdList.isEmpty()) { return new ArrayList<>(); } - List tasks = new ArrayList<>(); - taskIdList.forEach(id -> { - if (null != idToTasks.get(id)) { - tasks.add(idToTasks.get(id)); + List queryTasks = new ArrayList<>(); + insertTaskQueue.forEach(task -> { + if (taskIdList.contains(task.getTaskId())) { + queryTasks.add(task); } }); - return tasks; + return queryTasks; } @Override @@ -462,7 +488,7 @@ public class InsertJob extends AbstractJob> impl // load end time jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs())); // tracking urls - List trackingUrl = idToTasks.values().stream() + List trackingUrl = insertTaskQueue.stream() .map(task -> { if (StringUtils.isNotEmpty(task.getTrackingUrl())) { return task.getTrackingUrl(); @@ -527,7 +553,7 @@ public class InsertJob extends AbstractJob> impl public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows, long scannedBytes, boolean isDone) { loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone); - progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100); + progress = (int) ((double) finishedTaskIds.size() / insertTaskQueue.size() * 100); if (progress == 100) { progress = 99; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 8fe786555c..6e6f59758b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -37,6 +37,7 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; @@ -52,11 +53,12 @@ public class InsertTask extends AbstractTask { public static final ImmutableList SCHEMA = ImmutableList.of( new Column("TaskId", ScalarType.createStringType()), new Column("JobId", ScalarType.createStringType()), + new Column("JobName", ScalarType.createStringType()), new Column("Label", ScalarType.createStringType()), new Column("Status", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), - new Column("CreateTimeMs", ScalarType.createStringType()), - new Column("FinishTimeMs", ScalarType.createStringType()), + new Column("CreateTime", ScalarType.createStringType()), + new Column("FinishTime", ScalarType.createStringType()), new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), new Column("User", ScalarType.createStringType())); @@ -64,7 +66,7 @@ public class InsertTask extends AbstractTask { public static final ImmutableMap COLUMN_TO_INDEX; static { - ImmutableMap.Builder builder = new ImmutableMap.Builder(); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (int i = 0; i < SCHEMA.size(); i++) { builder.put(SCHEMA.get(i).getName().toLowerCase(), i); } @@ -77,6 +79,7 @@ public class InsertTask extends AbstractTask { private ConnectContext ctx; private String sql; private String currentDb; + @SerializedName(value = "uif") private UserIdentity userIdentity; private LoadStatistic loadStatistic; private AtomicBoolean isCanceled = new AtomicBoolean(false); @@ -211,42 +214,51 @@ public class InsertTask extends AbstractTask { } @Override - public TRow getTvfInfo() { + public TRow getTvfInfo(String jobName) { TRow trow = new TRow(); if (jobInfo == null) { // if task not start, load job is null,return pending task show info - return getPendingTaskTVFInfo(); + return getPendingTaskTVFInfo(jobName); } trow.addToColumnValue(new TCell().setStringVal(String.valueOf(jobInfo.getId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(labelName)); + trow.addToColumnValue(new TCell().setStringVal(jobName)); + trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(jobInfo.getState().name())); // err msg - String errMsg = ""; + String errorMsg = ""; if (failMsg != null) { - errMsg = "type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg(); + errorMsg = failMsg.getMsg(); } - trow.addToColumnValue(new TCell().setStringVal(errMsg)); + if (StringUtils.isNotBlank(getErrMsg())) { + errorMsg = getErrMsg(); + } + trow.addToColumnValue(new TCell().setStringVal(errorMsg)); // create time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getCreateTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); // load end time - trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(jobInfo.getFinishTimestamp()))); + trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs()))); // tracking url trow.addToColumnValue(new TCell().setStringVal(trackingUrl)); - if (null != loadStatistic) { - trow.addToColumnValue(new TCell().setStringVal(loadStatistic.toJson())); + if (null != jobInfo.getLoadStatistic()) { + trow.addToColumnValue(new TCell().setStringVal(jobInfo.getLoadStatistic().toJson())); } else { trow.addToColumnValue(new TCell().setStringVal("")); } - trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); + if (userIdentity == null) { + trow.addToColumnValue(new TCell().setStringVal("")); + } else { + trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); + } return trow; } // if task not start, load job is null,return pending task show info - private TRow getPendingTaskTVFInfo() { + private TRow getPendingTaskTVFInfo(String jobName) { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(getJobId()))); + trow.addToColumnValue(new TCell().setStringVal(jobName)); trow.addToColumnValue(new TCell().setStringVal(getJobId() + LABEL_SPLITTER + getTaskId())); trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal("")); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index f67144cd0d..9e872cf034 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -299,11 +299,11 @@ public class MTMVTask extends AbstractTask { } @Override - public TRow getTvfInfo() { + public TRow getTvfInfo(String jobName) { TRow trow = new TRow(); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getTaskId()))); trow.addToColumnValue(new TCell().setStringVal(String.valueOf(super.getJobId()))); - trow.addToColumnValue(new TCell().setStringVal(super.getJobName())); + trow.addToColumnValue(new TCell().setStringVal(jobName)); String dbName = ""; String mvName = ""; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index d8a30a968a..4eb333426c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -29,8 +29,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.LogBuilder; -import org.apache.doris.common.util.LogKey; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.job.base.AbstractJob; import org.apache.doris.job.common.JobStatus; @@ -267,8 +265,7 @@ public class JobManager, C> implements Writable { **/ public void replayUpdateJob(T job) { jobMap.put(job.getJobId(), job); - log.info(new LogBuilder(LogKey.SCHEDULER_JOB, job.getJobId()) - .add("msg", "replay update scheduler job").build()); + job.logUpdateOperation(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index a104d3895e..2100511d22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -196,7 +196,7 @@ public class JobScheduler, C> implements Closeable { } private void clearEndJob(T job) { - if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS < System.currentTimeMillis()) { + if (job.getFinishTimeMs() + FINISHED_JOB_CLEANUP_THRESHOLD_TIME_MS > System.currentTimeMillis()) { return; } try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index 71f6ff1c4f..7bd2e58f87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -63,7 +63,7 @@ public abstract class AbstractTask implements Task { } @Override - public void onFail(String msg) throws JobException { + public void onFail() throws JobException { status = TaskStatus.FAILED; if (!isCallable()) { return; @@ -72,12 +72,13 @@ public abstract class AbstractTask implements Task { } @Override - public void onFail() throws JobException { + public void onFail(String errMsg) throws JobException { if (TaskStatus.CANCELED.equals(status)) { return; } status = TaskStatus.FAILED; setFinishTimeMs(System.currentTimeMillis()); + setErrMsg(errMsg); if (!isCallable()) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java index 8e82984c2f..ee205c55c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/Task.java @@ -70,5 +70,5 @@ public interface Task { * get info for tvf `tasks` * @return TRow */ - TRow getTvfInfo(); + TRow getTvfInfo(String jobName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 50d97e6970..d5b43b6cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -809,7 +809,7 @@ public class MetadataGenerator { } List tasks = job.queryAllTasks(); for (AbstractTask task : tasks) { - TRow tvfInfo = task.getTvfInfo(); + TRow tvfInfo = task.getTvfInfo(job.getJobName()); if (tvfInfo != null) { dataBatch.add(tvfInfo); } diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 8fe7c0e774..3f5cd5692f 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -118,13 +118,11 @@ suite("test_base_insert_job") { """ Thread.sleep(2000) - def onceJob = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ + def onceJob = sql """ select SucceedTaskCount from jobs("type"="insert") where Name like '%${jobName}%' and ExecuteType='ONE_TIME' """ assert onceJob.size() == 1 - def onceJobId = onceJob.get(0).get(0); - def onceJobSql = onceJob.get(0).get(1); - println onceJobSql - // test cancel task - def datas = sql """select status,taskid from tasks("type"="insert") where jobid= ${onceJobId}""" + //check succeed task count + assert '1' == onceJob.get(0).get(0) + def datas = sql """select status,taskid from tasks("type"="insert") where jobName= '${jobName}'""" println datas assert datas.size() == 1 assert datas.get(0).get(0) == "FINISHED" @@ -154,7 +152,7 @@ suite("test_base_insert_job") { DROP JOB IF EXISTS where jobname = '${jobName}' """ sql """ - CREATE JOB ${jobName} ON SCHEDULE every 1 second comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001); + CREATE JOB ${jobName} ON SCHEDULE every 1 second starts current_timestamp comment 'test for test&68686781jbjbhj//ncsa' DO insert into ${tableName} values ('2023-07-19',5, 1001); """ Thread.sleep(2000) @@ -162,15 +160,12 @@ suite("test_base_insert_job") { sql """ PAUSE JOB where jobname = '${jobName}' """ - def job = sql """ select id,ExecuteSql from jobs("type"="insert") where Name like '%${jobName}%' """ - assert job.size() == 1 - def jobId = job.get(0).get(0); - def tasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + def tasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ sql """ RESUME JOB where jobname = '${jobName}' """ Thread.sleep(2500) - def afterResumeTasks = sql """ select status from tasks("type"="insert") where jobid= ${jobId} """ + def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ println afterResumeTasks assert afterResumeTasks.size() >tasks.size // assert same job name