[RoutineLoad] Support pause or resume all routine load jobs (#6394)

1. PAUSE ALL ROUTINE LOAD;
2. RESUME ALL ROUTINE LOAD;
This commit is contained in:
Mingyu Chen
2021-08-11 16:38:06 +08:00
committed by GitHub
parent 7e93405df3
commit 708b6c529e
9 changed files with 222 additions and 38 deletions

View File

@ -1757,6 +1757,10 @@ pause_routine_load_stmt ::=
{:
RESULT = new PauseRoutineLoadStmt(jobLabel);
:}
| KW_PAUSE KW_ALL KW_ROUTINE KW_LOAD
{:
RESULT = new PauseRoutineLoadStmt(null);
:}
;
resume_routine_load_stmt ::=
@ -1764,6 +1768,10 @@ resume_routine_load_stmt ::=
{:
RESULT = new ResumeRoutineLoadStmt(jobLabel);
:}
| KW_RESUME KW_ALL KW_ROUTINE KW_LOAD
{:
RESULT = new ResumeRoutineLoadStmt(null);
:}
;
stop_routine_load_stmt ::=

View File

@ -17,9 +17,13 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
/*
Pause routine load by name
@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
public class PauseRoutineLoadStmt extends DdlStmt {
private final LabelName labelName;
private String db;
public PauseRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}
public boolean isAll() {
return labelName == null;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName(){
return labelName.getDbName();
return db;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
}

View File

@ -17,9 +17,13 @@
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
/*
Resume routine load job by name
@ -29,22 +33,35 @@ import org.apache.doris.common.UserException;
public class ResumeRoutineLoadStmt extends DdlStmt{
private final LabelName labelName;
private String db;
public ResumeRoutineLoadStmt(LabelName labelName) {
this.labelName = labelName;
}
public boolean isAll() {
return labelName == null;
}
public String getName() {
return labelName.getLabelName();
}
public String getDbFullName() {
return labelName.getDbName();
return db;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
if (labelName != null) {
labelName.analyze(analyzer);
db = labelName.getDbName();
} else {
if (Strings.isNullOrEmpty(analyzer.getDefaultDb())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
db = ClusterNamespace.getFullName(analyzer.getClusterName(), analyzer.getDefaultDb());
}
}
}

View File

@ -232,34 +232,93 @@ public class RoutineLoadManager implements Writable {
return routineLoadJob;
}
// get all jobs which state is not in final state from specified database
public List<RoutineLoadJob> checkPrivAndGetAllJobs(String dbName)
throws MetaNotFoundException, DdlException, AnalysisException {
List<RoutineLoadJob> result = Lists.newArrayList();
Database database = Catalog.getCurrentCatalog().getDb(dbName);
if (database == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
long dbId = database.getId();
Map<String, List<RoutineLoadJob>> jobMap = dbToNameToRoutineLoadJob.get(dbId);
if (jobMap == null) {
// return empty result
return result;
}
for (List<RoutineLoadJob> jobs : jobMap.values()) {
for (RoutineLoadJob job : jobs) {
if (!job.getState().isFinalState()) {
String tableName = job.getTableName();
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbName, tableName, PrivPredicate.LOAD)) {
continue;
}
result.add(job);
}
}
}
return result;
}
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (pauseRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(pauseRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
} catch (UserException e) {
LOG.warn("failed to pause routine load job {}", routineLoadJob.getName(), e);
continue;
}
}
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());
List<RoutineLoadJob> jobs = Lists.newArrayList();
if (resumeRoutineLoadStmt.isAll()) {
jobs = checkPrivAndGetAllJobs(resumeRoutineLoadStmt.getDbFullName());
} else {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
jobs.add(routineLoadJob);
}
for (RoutineLoadJob routineLoadJob : jobs) {
try {
routineLoadJob.jobStatistic.errorRowsAfterResumed = 0;
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been resumed by user")
.build());
} catch (UserException e) {
LOG.warn("failed to resume routine load job {}", routineLoadJob.getName(), e);
continue;
}
}
}
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)

View File

@ -913,4 +913,68 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState());
}
@Test
public void testPauseAndResumeAllRoutineLoadJob(@Injectable PauseRoutineLoadStmt pauseRoutineLoadStmt,
@Injectable ResumeRoutineLoadStmt resumeRoutineLoadStmt,
@Mocked Catalog catalog,
@Mocked Database database,
@Mocked PaloAuth paloAuth,
@Mocked ConnectContext connectContext) throws UserException {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();
List<RoutineLoadJob> routineLoadJobList1 = Lists.newArrayList();
RoutineLoadJob routineLoadJob1 = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob1, "id", 1000L);
routineLoadJobList1.add(routineLoadJob1);
List<RoutineLoadJob> routineLoadJobList2 = Lists.newArrayList();
RoutineLoadJob routineLoadJob2 = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob2, "id", 1002L);
routineLoadJobList2.add(routineLoadJob2);
nameToRoutineLoadJob.put("job1", routineLoadJobList1);
nameToRoutineLoadJob.put("job2", routineLoadJobList2);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
new Expectations() {
{
pauseRoutineLoadStmt.isAll();
minTimes = 0;
result = true;
pauseRoutineLoadStmt.getDbFullName();
minTimes = 0;
result = "";
catalog.getDb("");
minTimes = 0;
result = database;
database.getId();
minTimes = 0;
result = 1L;
catalog.getAuth();
minTimes = 0;
result = paloAuth;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
resumeRoutineLoadStmt.isAll();
minTimes = 0;
result = true;
}
};
routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob2.getState());
routineLoadManager.resumeRoutineLoadJob(resumeRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob1.getState());
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob2.getState());
}
}