From e352a0833951ba58b1d33aca472e765b644ef67d Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Fri, 19 Apr 2019 09:18:06 +0800 Subject: [PATCH] Change tips of show routine load task (#959) 1. Add pauseTimestamp 2. It will be set when job is paused and it will be removed when job is resumed --- .../apache/doris/analysis/ShowRoutineLoadStmt.java | 1 + .../doris/analysis/ShowRoutineLoadTaskStmt.java | 2 +- .../doris/load/routineload/RoutineLoadJob.java | 12 +++++++++--- .../doris/load/routineload/RoutineLoadJobTest.java | 2 +- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 6542d7d56e..2043dee538 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -67,6 +67,7 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("Id") .add("Name") .add("CreateTime") + .add("PauseTime") .add("EndTime") .add("DbName") .add("TableName") diff --git a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java index e2e11a82d5..a49994e773 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ShowRoutineLoadTaskStmt.java @@ -90,7 +90,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt { private void checkJobNameExpr(Analyzer analyzer) throws AnalysisException { if (jobNameExpr == null) { - throw new AnalysisException("please designate a name in where expr such as name=xxx"); + throw new AnalysisException("please designate a jobName in where expr such as JobName=\"ILoveDoris\""); } boolean valid = true; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 1a428c2eec..f76099d2cd 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -161,10 +161,11 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable protected int currentTaskConcurrentNum; protected RoutineLoadProgress progress; - protected String pausedReason; + protected String pauseReason; protected String cancelReason; protected long createTimestamp = System.currentTimeMillis(); + protected long pauseTimestamp = -1; protected long endTimestamp = -1; /* @@ -848,13 +849,15 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable private void executePause(String reason) { // remove all of task in jobs and change job state to paused - pausedReason = reason; + pauseReason = reason; state = JobState.PAUSED; + pauseTimestamp = System.currentTimeMillis(); routineLoadTaskInfoList.clear(); } private void executeNeedSchedule() { state = JobState.NEED_SCHEDULE; + pauseTimestamp = -1; routineLoadTaskInfoList.clear(); } @@ -961,6 +964,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable row.add(String.valueOf(id)); row.add(name); row.add(TimeUtils.longToTimeString(createTimestamp)); + row.add(TimeUtils.longToTimeString(pauseTimestamp)); row.add(TimeUtils.longToTimeString(endTimestamp)); row.add(db == null ? String.valueOf(dbId) : db.getFullName()); row.add(tbl == null ? String.valueOf(tableId) : tbl.getName()); @@ -973,7 +977,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable row.add(getProgress().toJsonString()); switch (state) { case PAUSED: - row.add(pausedReason); + row.add(pauseReason); break; case CANCELLED: row.add(cancelReason); @@ -1082,6 +1086,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable progress.write(out); out.writeLong(createTimestamp); + out.writeLong(pauseTimestamp); out.writeLong(endTimestamp); out.writeLong(currentErrorRows); @@ -1127,6 +1132,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable } createTimestamp = in.readLong(); + pauseTimestamp = in.readLong(); endTimestamp = in.readLong(); currentErrorRows = in.readLong(); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 7c7b277cae..86b96e8e18 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -159,7 +159,7 @@ public class RoutineLoadJobTest { public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - Deencapsulation.setField(routineLoadJob, "pausedReason", + Deencapsulation.setField(routineLoadJob, "pauseReason", TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);