[Fix](Job)Replaying logs should not modify the original information of the job (#40474) (#40808)

…

## Proposed changes
```
        JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
        jobConfig.setExecuteType(JobExecuteType.INSTANT);
        setJobConfig(jobConfig);
```
- Replaying logs should not modify the original information of the job
- Use the new optimizer to check whether the executed statement is legal

(cherry picked from commit de90051162de7004cf171bbf4d21bd95ff9f3540)

## Proposed changes

Issue Number: #40474
This commit is contained in:
Calvin Kirs
2024-09-13 20:47:57 +08:00
committed by GitHub
parent c2ad828b0c
commit f9b79c613a
6 changed files with 103 additions and 43 deletions

View File

@ -32,15 +32,15 @@ import org.apache.doris.job.common.IntervalUnit;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.HashSet;
/**
* syntax:
* CREATE
@ -91,12 +91,6 @@ public class CreateJobStmt extends DdlStmt {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
private static final ImmutableSet<Class<? extends DdlStmt>> supportStmtSuperClass
= new ImmutableSet.Builder<Class<? extends DdlStmt>>().add(InsertStmt.class)
.build();
private static final HashSet<String> supportStmtClassNamesCache = new HashSet<>(16);
public CreateJobStmt(LabelName labelName, JobExecuteType executeType, String onceJobStartTimestamp,
Long interval, String intervalTimeUnit,
String startsTimeStamp, String endsTimeStamp, String comment, StatementBase doStmt) {
@ -118,7 +112,6 @@ public class CreateJobStmt extends DdlStmt {
labelName.analyze(analyzer);
String dbName = labelName.getDbName();
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName);
analyzerSqlStmt();
// check its insert stmt,currently only support insert stmt
//todo when support other stmt,need to check stmt type and generate jobInstance
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
@ -164,6 +157,7 @@ public class CreateJobStmt extends DdlStmt {
jobExecutionConfiguration.setTimerDefinition(timerDefinition);
String originStmt = getOrigStmt().originStmt;
String executeSql = parseExecuteSql(originStmt, jobName, comment);
analyzerSqlStmt(executeSql);
// create job use label name as its job name
InsertJob job = new InsertJob(jobName,
JobStatus.RUNNING,
@ -191,22 +185,20 @@ public class CreateJobStmt extends DdlStmt {
}
}
private void checkStmtSupport() throws AnalysisException {
if (supportStmtClassNamesCache.contains(doStmt.getClass().getSimpleName())) {
return;
}
for (Class<? extends DdlStmt> clazz : supportStmtSuperClass) {
if (clazz.isAssignableFrom(doStmt.getClass())) {
supportStmtClassNamesCache.add(doStmt.getClass().getSimpleName());
return;
private void analyzerSqlStmt(String sql) throws UserException {
NereidsParser parser = new NereidsParser();
LogicalPlan logicalPlan = parser.parseSingle(sql);
if (logicalPlan instanceof InsertIntoTableCommand) {
InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) logicalPlan;
try {
insertIntoTableCommand.initPlan(ConnectContext.get(), ConnectContext.get().getExecutor());
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
}
throw new AnalysisException("Not support " + doStmt.getClass().getSimpleName() + " type in job");
}
private void analyzerSqlStmt() throws UserException {
checkStmtSupport();
doStmt.analyze(analyzer);
} else {
throw new AnalysisException("Not support this sql : " + sql);
}
}
/**

View File

@ -31,8 +31,6 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.job.base.AbstractJob;
@ -647,23 +645,8 @@ public class InsertJob extends AbstractJob<InsertTask, Map<Object, Object>> impl
@Override
public void onReplayCreate() throws JobException {
JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
jobConfig.setExecuteType(JobExecuteType.INSTANT);
setJobConfig(jobConfig);
onRegister();
checkJobParams();
log.info(new LogBuilder(LogKey.LOAD_JOB, getJobId()).add("msg", "replay create load job").build());
}
@Override
public void onReplayEnd(AbstractJob<?, Map<Object, Object>> replayJob) throws JobException {
if (!(replayJob instanceof InsertJob)) {
return;
}
InsertJob insertJob = (InsertJob) replayJob;
unprotectReadEndOperation(insertJob);
log.info(new LogBuilder(LogKey.LOAD_JOB,
insertJob.getJobId()).add("operation", insertJob).add("msg", "replay end load job").build());
super.onReplayCreate();
}
public int getProgress() {