Change tips of show routine load task (#959)

1. Add pauseTimestamp
2. It will be set when job is paused and it will be removed when job is resumed
This commit is contained in:
EmmyMiao87
2019-04-19 09:18:06 +08:00
committed by ZHAO Chun
parent 178757d46c
commit e352a08339
4 changed files with 12 additions and 5 deletions

View File

@ -67,6 +67,7 @@ public class ShowRoutineLoadStmt extends ShowStmt {
.add("Id")
.add("Name")
.add("CreateTime")
.add("PauseTime")
.add("EndTime")
.add("DbName")
.add("TableName")

View File

@ -90,7 +90,7 @@ public class ShowRoutineLoadTaskStmt extends ShowStmt {
private void checkJobNameExpr(Analyzer analyzer) throws AnalysisException {
if (jobNameExpr == null) {
throw new AnalysisException("please designate a name in where expr such as name=xxx");
throw new AnalysisException("please designate a jobName in where expr such as JobName=\"ILoveDoris\"");
}
boolean valid = true;

View File

@ -161,10 +161,11 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
protected int currentTaskConcurrentNum;
protected RoutineLoadProgress progress;
protected String pausedReason;
protected String pauseReason;
protected String cancelReason;
protected long createTimestamp = System.currentTimeMillis();
protected long pauseTimestamp = -1;
protected long endTimestamp = -1;
/*
@ -848,13 +849,15 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
private void executePause(String reason) {
// remove all of task in jobs and change job state to paused
pausedReason = reason;
pauseReason = reason;
state = JobState.PAUSED;
pauseTimestamp = System.currentTimeMillis();
routineLoadTaskInfoList.clear();
}
private void executeNeedSchedule() {
state = JobState.NEED_SCHEDULE;
pauseTimestamp = -1;
routineLoadTaskInfoList.clear();
}
@ -961,6 +964,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
row.add(String.valueOf(id));
row.add(name);
row.add(TimeUtils.longToTimeString(createTimestamp));
row.add(TimeUtils.longToTimeString(pauseTimestamp));
row.add(TimeUtils.longToTimeString(endTimestamp));
row.add(db == null ? String.valueOf(dbId) : db.getFullName());
row.add(tbl == null ? String.valueOf(tableId) : tbl.getName());
@ -973,7 +977,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
row.add(getProgress().toJsonString());
switch (state) {
case PAUSED:
row.add(pausedReason);
row.add(pauseReason);
break;
case CANCELLED:
row.add(cancelReason);
@ -1082,6 +1086,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
progress.write(out);
out.writeLong(createTimestamp);
out.writeLong(pauseTimestamp);
out.writeLong(endTimestamp);
out.writeLong(currentErrorRows);
@ -1127,6 +1132,7 @@ public abstract class RoutineLoadJob implements TxnStateChangeListener, Writable
}
createTimestamp = in.readLong();
pauseTimestamp = in.readLong();
endTimestamp = in.readLong();
currentErrorRows = in.readLong();

View File

@ -159,7 +159,7 @@ public class RoutineLoadJobTest {
public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
Deencapsulation.setField(routineLoadJob, "pausedReason",
Deencapsulation.setField(routineLoadJob, "pauseReason",
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);