From 86be6d27e7b5286f4e1d40d64f2c24a3301915ab Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Fri, 28 Apr 2023 09:27:23 +0800 Subject: [PATCH] [Enhencement](Cancel Export) Cancel export support to cancel IN_QUEUE state export job (#19058) --- .../Manipulation/CANCEL-EXPORT.md | 7 +- .../Manipulation/CANCEL-EXPORT.md | 7 +- .../doris/analysis/CancelExportStmt.java | 95 ++++++++++--------- .../java/org/apache/doris/load/ExportJob.java | 17 +++- .../java/org/apache/doris/load/ExportMgr.java | 7 +- .../doris/task/ExportExportingTask.java | 9 ++ .../doris/analysis/CancelExportStmtTest.java | 12 +++ 7 files changed, 99 insertions(+), 55 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md index 6671c20625..247ab04727 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md @@ -39,7 +39,7 @@ This statement is used to undo an export job for the specified label. Or batch u ```sql CANCEL EXPORT [FROM db_name] -WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"] +WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/IN_QUEUE/EXPORTING"] ``` ### Example @@ -49,7 +49,7 @@ WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EX ```sql CANCEL EXPORT FROM example_db - WHERE LABEL = "example_db_test_export_label"; + WHERE LABEL = "example_db_test_export_label" and STATE = "EXPORTING"; ```` 2. Cancel all export jobs containing example* on the database example*db. @@ -74,5 +74,6 @@ WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EX ### Best Practice -1. Only pending export jobs in PENDING, EXPORTING state can be canceled. +1. Only pending export jobs in PENDING, IN_QUEUE,EXPORTING state can be canceled. 2. When performing batch undo, Doris does not guarantee the atomic undo of all corresponding export jobs. That is, it is possible that only some of the export jobs were successfully undone. The user can view the job status through the SHOW EXPORT statement and try to execute the CANCEL EXPORT statement repeatedly. +3. When the job of the `EXPORTING` state is revoked, part of the data may have been exported to the storage system, and the user needs to process (delete) this section to export data. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md index e47845e9eb..fa52826ba1 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/CANCEL-EXPORT.md @@ -39,7 +39,7 @@ CANCEL EXPORT ```sql CANCEL EXPORT [FROM db_name] -WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"] +WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/IN_QUEUE/EXPORTING"] ``` ### Example @@ -49,7 +49,7 @@ WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EX ```sql CANCEL EXPORT FROM example_db - WHERE LABEL = "example_db_test_export_label"; + WHERE LABEL = "example_db_test_export_label" and STATE = "EXPORTING"; ``` 2. 撤销数据库 example*db 上, 所有包含 example* 的 EXPORT 作业。 @@ -74,5 +74,6 @@ WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EX ### Best Practice -1. 只能取消处于 PENDING、EXPORTING 状态的未完成的导入作业。 +1. 只能取消处于 PENDING、IN_QUEUE、EXPORTING 状态的未完成的导出作业。 2. 当执行批量撤销时,Doris 不会保证所有对应的 EXPORT 作业原子的撤销。即有可能仅有部分 EXPORT 作业撤销成功。用户可以通过 SHOW EXPORT 语句查看作业状态,并尝试重复执行 CANCEL EXPORT 语句。 +3. 当撤销`EXPORTING`状态的作业时,有可能作业已经导出部分数据到存储系统上,用户需要自行处理(删除)该部分导出数据。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java index 71998ba226..ca55761420 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java @@ -22,23 +22,25 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportJob.JobState; import com.google.common.base.Strings; -import com.google.common.collect.Sets; +import com.google.common.collect.ImmutableSet; import lombok.Getter; -import java.util.Set; - /** * CANCEL EXPORT statement used to cancel export job. * syntax: - * CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"] + * CANCEL EXPORT [FROM db] + * WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/IN_QUEUE/EXPORTING"] **/ public class CancelExportStmt extends DdlStmt { - private static final Set SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - + private static final ImmutableSet SUPPORT_COLUMNS = new ImmutableSet.Builder() + .add("label") + .add("state") + .build(); @Getter private String dbName; @@ -56,13 +58,11 @@ public class CancelExportStmt extends DdlStmt { public CancelExportStmt(String dbName, Expr whereClause) { this.dbName = dbName; this.whereClause = whereClause; - this.SUPPORT_COLUMNS.add("label"); - this.SUPPORT_COLUMNS.add("state"); } private void checkColumn(Expr expr, boolean like) throws AnalysisException { String inputCol = ((SlotRef) expr.getChild(0)).getColumnName(); - if (!SUPPORT_COLUMNS.contains(inputCol)) { + if (!SUPPORT_COLUMNS.contains(inputCol.toLowerCase())) { throw new AnalysisException("Current only support label and state, invalid column: " + inputCol); } if (!(expr.getChild(1) instanceof StringLiteral)) { @@ -83,62 +83,55 @@ public class CancelExportStmt extends DdlStmt { throw new AnalysisException("Only label can use like"); } state = inputValue; - try { - ExportJob.JobState jobState = ExportJob.JobState.valueOf(state); - if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) { - throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); - } - } catch (IllegalArgumentException e) { - throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); + ExportJob.JobState jobState = ExportJob.JobState.valueOf(state); + if (jobState != ExportJob.JobState.PENDING + && jobState != JobState.IN_QUEUE + && jobState != ExportJob.JobState.EXPORTING) { + throw new AnalysisException("Only support PENDING/IN_QUEUE/EXPORTING, invalid state: " + state); } } } private void likeCheck(Expr expr) throws AnalysisException { - if (expr instanceof LikePredicate) { - LikePredicate likePredicate = (LikePredicate) expr; - boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp()); - if (!like) { - throw new AnalysisException("Not support REGEXP"); - } - checkColumn(expr, true); + LikePredicate likePredicate = (LikePredicate) expr; + boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp()); + if (!like) { + throw new AnalysisException("Not support REGEXP"); } + checkColumn(expr, true); } private void binaryCheck(Expr expr) throws AnalysisException { - if (expr instanceof BinaryPredicate) { - BinaryPredicate binaryPredicate = (BinaryPredicate) expr; - if (!Operator.EQ.equals(binaryPredicate.getOp())) { - throw new AnalysisException("Only support equal or like"); - } - checkColumn(expr, false); + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + if (!Operator.EQ.equals(binaryPredicate.getOp())) { + throw new AnalysisException("Only support equal or like"); } + checkColumn(expr, false); } private void compoundCheck(Expr expr) throws AnalysisException { - if (expr == null) { - throw new AnalysisException("Where clause can't be null"); + // current only support label and state + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { + throw new AnalysisException("Current not support NOT operator"); } - if (expr instanceof CompoundPredicate) { - // current only support label and state - CompoundPredicate compoundPredicate = (CompoundPredicate) expr; - if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { - throw new AnalysisException("Current not support NOT operator"); - } - for (int i = 0; i < 2; i++) { - Expr child = compoundPredicate.getChild(i); - if (child instanceof CompoundPredicate) { - throw new AnalysisException("Current not support nested clause"); - } + for (int i = 0; i < 2; i++) { + Expr child = compoundPredicate.getChild(i); + if (child instanceof CompoundPredicate) { + throw new AnalysisException("Current not support nested clause"); + } else if (child instanceof LikePredicate) { likeCheck(child); + } else if (child instanceof BinaryPredicate) { binaryCheck(child); + } else { + throw new AnalysisException("Only support like/binary predicate"); } - operator = compoundPredicate.getOp(); } + operator = compoundPredicate.getOp(); } @Override - public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); if (Strings.isNullOrEmpty(dbName)) { dbName = analyzer.getDefaultDb(); @@ -149,9 +142,17 @@ public class CancelExportStmt extends DdlStmt { dbName = ClusterNamespace.getFullName(getClusterName(), dbName); } - likeCheck(whereClause); - binaryCheck(whereClause); - compoundCheck(whereClause); + if (null == whereClause) { + throw new AnalysisException("Where clause can't be null"); + } else if (whereClause instanceof LikePredicate) { + likeCheck(whereClause); + } else if (whereClause instanceof BinaryPredicate) { + binaryCheck(whereClause); + } else if (whereClause instanceof CompoundPredicate) { + compoundCheck(whereClause); + } else { + throw new AnalysisException("Only support like/binary/compound predicate"); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 53dd160a27..ac1dec8d7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -55,6 +55,7 @@ import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.system.Backend; import org.apache.doris.task.AgentClient; +import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TAgentResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; @@ -151,7 +152,6 @@ public class ExportJob implements Writable { private ExportFailMsg failMsg; private String outfileInfo; - private TableRef tableRef; private Expr whereExpr; @@ -176,6 +176,8 @@ public class ExportJob implements Writable { private Thread doExportingThread; + private ExportExportingTask task; + private List tabletLocations = Lists.newArrayList(); // backend_address => snapshot path private List> snapshotPaths = Lists.newArrayList(); @@ -442,6 +444,14 @@ public class ExportJob implements Writable { return sql; } + public ExportExportingTask getTask() { + return task; + } + + public void setTask(ExportExportingTask task) { + this.task = task; + } + public TableName getTableName() { return tableName; } @@ -484,6 +494,7 @@ public class ExportJob implements Writable { if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) { return false; } + ExportJob.JobState oldState = state; state = newState; switch (newState) { case PENDING: @@ -501,6 +512,10 @@ public class ExportJob implements Writable { // if isReplay == true, finishTimeMs will be read from log if (!isReplay) { finishTimeMs = System.currentTimeMillis(); + // maybe user cancel this job + if (task != null && oldState == JobState.EXPORTING && task.getStmtExecutor() != null) { + task.getStmtExecutor().cancel(); + } } progress = 100; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index aa064226dd..43a3e138c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -114,11 +114,11 @@ public class ExportMgr extends MasterDaemon { for (ExportJob job : newInQueueJobs) { try { MasterTask task = new ExportExportingTask(job); + job.setTask((ExportExportingTask) task); if (exportingExecutor.submit(task)) { LOG.info("success to submit IN_QUEUE export job. job: {}", job); } else { LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job); - } } catch (Exception e) { LOG.warn("run export exporting job {}.", job, e); @@ -127,6 +127,11 @@ public class ExportMgr extends MasterDaemon { } private boolean handlePendingJobs(ExportJob job) { + // because maybe this job has been cancelled by user. + if (job.getState() != JobState.PENDING) { + return false; + } + if (job.isReplayed()) { // If the job is created from replay thread, all plan info will be lost. // so the job has to be cancelled. diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 08af959e22..ba0843d5e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -59,6 +59,10 @@ public class ExportExportingTask extends MasterTask { this.signature = job.getId(); } + public StmtExecutor getStmtExecutor() { + return stmtExecutor; + } + @Override protected void exec() { if (job.getState() == JobState.IN_QUEUE) { @@ -85,6 +89,11 @@ public class ExportExportingTask extends MasterTask { List outfileInfoList = Lists.newArrayList(); // begin exporting for (int i = 0; i < selectStmtList.size(); ++i) { + // maybe user cancelled this job + if (job.getState() != JobState.EXPORTING) { + isFailed = true; + break; + } try (AutoCloseConnectContext r = buildConnectContext()) { this.stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(i)); this.stmtExecutor.execute(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index b8e54bf86d..c06cae0500 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -158,12 +158,15 @@ public class CancelExportStmtTest extends TestWithFeService { ExportJob job3 = new ExportJob(); job3.updateState(ExportJob.JobState.EXPORTING, false); ExportJob job4 = new ExportJob(); + ExportJob job5 = new ExportJob(); + job5.updateState(ExportJob.JobState.IN_QUEUE, false); exportJobList1.add(job1); exportJobList1.add(job2); exportJobList1.add(job3); exportJobList1.add(job4); exportJobList2.add(job1); exportJobList2.add(job2); + exportJobList2.add(job5); SlotRef stateSlotRef = new SlotRef(null, "state"); StringLiteral stateStringLiteral = new StringLiteral("PENDING"); @@ -185,6 +188,15 @@ public class CancelExportStmtTest extends TestWithFeService { Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1); + stateStringLiteral = new StringLiteral("IN_QUEUE"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + filter = ExportMgr.buildCancelJobFilter(stmt); + + Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1); + } }