[Enhencement](Cancel Export) Cancel export support to cancel IN_QUEUE state export job (#19058)

This commit is contained in:
Tiewei Fang
2023-04-28 09:27:23 +08:00
committed by GitHub
parent 3082ed806f
commit 86be6d27e7
7 changed files with 99 additions and 55 deletions

View File

@ -22,23 +22,25 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJob.JobState;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import com.google.common.collect.ImmutableSet;
import lombok.Getter;
import java.util.Set;
/**
* CANCEL EXPORT statement used to cancel export job.
* syntax:
* CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"]
* CANCEL EXPORT [FROM db]
* WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/IN_QUEUE/EXPORTING"]
**/
public class CancelExportStmt extends DdlStmt {
private static final Set<String> SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
private static final ImmutableSet<String> SUPPORT_COLUMNS = new ImmutableSet.Builder<String>()
.add("label")
.add("state")
.build();
@Getter
private String dbName;
@ -56,13 +58,11 @@ public class CancelExportStmt extends DdlStmt {
public CancelExportStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.SUPPORT_COLUMNS.add("label");
this.SUPPORT_COLUMNS.add("state");
}
private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
if (!SUPPORT_COLUMNS.contains(inputCol.toLowerCase())) {
throw new AnalysisException("Current only support label and state, invalid column: " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
@ -83,62 +83,55 @@ public class CancelExportStmt extends DdlStmt {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
try {
ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
}
} catch (IllegalArgumentException e) {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
if (jobState != ExportJob.JobState.PENDING
&& jobState != JobState.IN_QUEUE
&& jobState != ExportJob.JobState.EXPORTING) {
throw new AnalysisException("Only support PENDING/IN_QUEUE/EXPORTING, invalid state: " + state);
}
}
}
private void likeCheck(Expr expr) throws AnalysisException {
if (expr instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
}
private void binaryCheck(Expr expr) throws AnalysisException {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
}
private void compoundCheck(Expr expr) throws AnalysisException {
if (expr == null) {
throw new AnalysisException("Where clause can't be null");
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
throw new AnalysisException("Current not support NOT operator");
}
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
throw new AnalysisException("Current not support NOT operator");
}
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current not support nested clause");
}
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current not support nested clause");
} else if (child instanceof LikePredicate) {
likeCheck(child);
} else if (child instanceof BinaryPredicate) {
binaryCheck(child);
} else {
throw new AnalysisException("Only support like/binary predicate");
}
operator = compoundPredicate.getOp();
}
operator = compoundPredicate.getOp();
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
if (Strings.isNullOrEmpty(dbName)) {
dbName = analyzer.getDefaultDb();
@ -149,9 +142,17 @@ public class CancelExportStmt extends DdlStmt {
dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
likeCheck(whereClause);
binaryCheck(whereClause);
compoundCheck(whereClause);
if (null == whereClause) {
throw new AnalysisException("Where clause can't be null");
} else if (whereClause instanceof LikePredicate) {
likeCheck(whereClause);
} else if (whereClause instanceof BinaryPredicate) {
binaryCheck(whereClause);
} else if (whereClause instanceof CompoundPredicate) {
compoundCheck(whereClause);
} else {
throw new AnalysisException("Only support like/binary/compound predicate");
}
}
@Override

View File

@ -55,6 +55,7 @@ import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentClient;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
@ -151,7 +152,6 @@ public class ExportJob implements Writable {
private ExportFailMsg failMsg;
private String outfileInfo;
private TableRef tableRef;
private Expr whereExpr;
@ -176,6 +176,8 @@ public class ExportJob implements Writable {
private Thread doExportingThread;
private ExportExportingTask task;
private List<TScanRangeLocations> tabletLocations = Lists.newArrayList();
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
@ -442,6 +444,14 @@ public class ExportJob implements Writable {
return sql;
}
public ExportExportingTask getTask() {
return task;
}
public void setTask(ExportExportingTask task) {
this.task = task;
}
public TableName getTableName() {
return tableName;
}
@ -484,6 +494,7 @@ public class ExportJob implements Writable {
if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) {
return false;
}
ExportJob.JobState oldState = state;
state = newState;
switch (newState) {
case PENDING:
@ -501,6 +512,10 @@ public class ExportJob implements Writable {
// if isReplay == true, finishTimeMs will be read from log
if (!isReplay) {
finishTimeMs = System.currentTimeMillis();
// maybe user cancel this job
if (task != null && oldState == JobState.EXPORTING && task.getStmtExecutor() != null) {
task.getStmtExecutor().cancel();
}
}
progress = 100;
break;

View File

@ -114,11 +114,11 @@ public class ExportMgr extends MasterDaemon {
for (ExportJob job : newInQueueJobs) {
try {
MasterTask task = new ExportExportingTask(job);
job.setTask((ExportExportingTask) task);
if (exportingExecutor.submit(task)) {
LOG.info("success to submit IN_QUEUE export job. job: {}", job);
} else {
LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job);
}
} catch (Exception e) {
LOG.warn("run export exporting job {}.", job, e);
@ -127,6 +127,11 @@ public class ExportMgr extends MasterDaemon {
}
private boolean handlePendingJobs(ExportJob job) {
// because maybe this job has been cancelled by user.
if (job.getState() != JobState.PENDING) {
return false;
}
if (job.isReplayed()) {
// If the job is created from replay thread, all plan info will be lost.
// so the job has to be cancelled.

View File

@ -59,6 +59,10 @@ public class ExportExportingTask extends MasterTask {
this.signature = job.getId();
}
public StmtExecutor getStmtExecutor() {
return stmtExecutor;
}
@Override
protected void exec() {
if (job.getState() == JobState.IN_QUEUE) {
@ -85,6 +89,11 @@ public class ExportExportingTask extends MasterTask {
List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
// begin exporting
for (int i = 0; i < selectStmtList.size(); ++i) {
// maybe user cancelled this job
if (job.getState() != JobState.EXPORTING) {
isFailed = true;
break;
}
try (AutoCloseConnectContext r = buildConnectContext()) {
this.stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(i));
this.stmtExecutor.execute();