diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index b8b4ce56b3..77260ba95f 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -3993,6 +3993,10 @@ cancel_param ::= {: RESULT = new CancelLoadStmt(db, parser.where); :} + | KW_EXPORT opt_db:db opt_wild_where + {: + RESULT = new CancelExportStmt(db, parser.where); + :} | KW_ALTER KW_TABLE opt_alter_type:type KW_FROM table_name:table cancel_rollup_job_id_list:list {: RESULT = new CancelAlterTableStmt(type, table, list); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java new file mode 100644 index 0000000000..71998ba226 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelExportStmt.java @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.ExportJob; + +import com.google.common.base.Strings; +import com.google.common.collect.Sets; +import lombok.Getter; + +import java.util.Set; + + +/** + * CANCEL EXPORT statement used to cancel export job. + * syntax: + * CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"] + **/ +public class CancelExportStmt extends DdlStmt { + + private static final Set SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + + @Getter + private String dbName; + + @Getter + private CompoundPredicate.Operator operator; + + @Getter + private String label; + + @Getter + private String state; + + private Expr whereClause; + + public CancelExportStmt(String dbName, Expr whereClause) { + this.dbName = dbName; + this.whereClause = whereClause; + this.SUPPORT_COLUMNS.add("label"); + this.SUPPORT_COLUMNS.add("state"); + } + + private void checkColumn(Expr expr, boolean like) throws AnalysisException { + String inputCol = ((SlotRef) expr.getChild(0)).getColumnName(); + if (!SUPPORT_COLUMNS.contains(inputCol)) { + throw new AnalysisException("Current only support label and state, invalid column: " + inputCol); + } + if (!(expr.getChild(1) instanceof StringLiteral)) { + throw new AnalysisException("Value must be a string"); + } + + String inputValue = expr.getChild(1).getStringValue(); + if (Strings.isNullOrEmpty(inputValue)) { + throw new AnalysisException("Value can't be null"); + } + + if (inputCol.equalsIgnoreCase("label")) { + label = inputValue; + } + + if (inputCol.equalsIgnoreCase("state")) { + if (like) { + throw new AnalysisException("Only label can use like"); + } + state = inputValue; + try { + ExportJob.JobState jobState = ExportJob.JobState.valueOf(state); + if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) { + throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); + } + } catch (IllegalArgumentException e) { + throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state); + } + } + } + + private void likeCheck(Expr expr) throws AnalysisException { + if (expr instanceof LikePredicate) { + LikePredicate likePredicate = (LikePredicate) expr; + boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp()); + if (!like) { + throw new AnalysisException("Not support REGEXP"); + } + checkColumn(expr, true); + } + } + + private void binaryCheck(Expr expr) throws AnalysisException { + if (expr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) expr; + if (!Operator.EQ.equals(binaryPredicate.getOp())) { + throw new AnalysisException("Only support equal or like"); + } + checkColumn(expr, false); + } + } + + private void compoundCheck(Expr expr) throws AnalysisException { + if (expr == null) { + throw new AnalysisException("Where clause can't be null"); + } + if (expr instanceof CompoundPredicate) { + // current only support label and state + CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { + throw new AnalysisException("Current not support NOT operator"); + } + for (int i = 0; i < 2; i++) { + Expr child = compoundPredicate.getChild(i); + if (child instanceof CompoundPredicate) { + throw new AnalysisException("Current not support nested clause"); + } + likeCheck(child); + binaryCheck(child); + } + operator = compoundPredicate.getOp(); + } + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); + if (Strings.isNullOrEmpty(dbName)) { + dbName = analyzer.getDefaultDb(); + if (Strings.isNullOrEmpty(dbName)) { + throw new AnalysisException("No database selected"); + } + } else { + dbName = ClusterNamespace.getFullName(getClusterName(), dbName); + } + + likeCheck(whereClause); + binaryCheck(whereClause); + compoundCheck(whereClause); + } + + @Override + public String toSql() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("CANCEL EXPORT "); + if (!Strings.isNullOrEmpty(dbName)) { + stringBuilder.append("FROM ").append(dbName); + } + + if (whereClause != null) { + stringBuilder.append(" WHERE ").append(whereClause.toSql()); + } + return stringBuilder.toString(); + } + + @Override + public String toString() { + return toSql(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java index c08f6370a4..2ca9970677 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CancelLoadStmt.java @@ -123,6 +123,9 @@ public class CancelLoadStmt extends DdlStmt { if (expr instanceof CompoundPredicate) { // current only support label and state CompoundPredicate compoundPredicate = (CompoundPredicate) expr; + if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) { + throw new AnalysisException("Current not support NOT operator"); + } for (int i = 0; i < 2; i++) { Expr child = compoundPredicate.getChild(i); if (child instanceof CompoundPredicate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 00057dfd67..7f681548df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -521,7 +521,7 @@ public class ExportJob implements Writable { return whereExpr; } - public JobState getState() { + public synchronized JobState getState() { return state; } @@ -651,11 +651,12 @@ public class ExportJob implements Writable { } public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { - releaseSnapshotPaths(); if (msg != null) { failMsg = new ExportFailMsg(type, msg); } - updateState(ExportJob.JobState.CANCELLED, false); + if (updateState(ExportJob.JobState.CANCELLED, false)) { + releaseSnapshotPaths(); + } } public synchronized boolean updateState(ExportJob.JobState newState) { @@ -663,6 +664,9 @@ public class ExportJob implements Writable { } public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) { + if (isFinalState()) { + return false; + } state = newState; switch (newState) { case PENDING: @@ -686,6 +690,10 @@ public class ExportJob implements Writable { return true; } + public synchronized boolean isFinalState() { + return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED; + } + public Status releaseSnapshotPaths() { List> snapshotPaths = getSnapshotPaths(); LOG.debug("snapshotPaths:{}", snapshotPaths); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index b332f7ed25..cdee254f8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -17,6 +17,8 @@ package org.apache.doris.load; +import org.apache.doris.analysis.CancelExportStmt; +import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; @@ -24,6 +26,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; @@ -33,10 +36,12 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,6 +53,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import java.util.stream.Collectors; public class ExportMgr { @@ -99,11 +105,62 @@ public class ExportMgr { LOG.info("add export job. {}", job); } + public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException { + // List of export jobs waiting to be cancelled + List matchExportJobs = getWaitingCancelJobs(stmt); + if (matchExportJobs.isEmpty()) { + throw new DdlException("Export job(s) do not exist"); + } + matchExportJobs = matchExportJobs.stream() + .filter(job -> !job.isFinalState()).collect(Collectors.toList()); + if (matchExportJobs.isEmpty()) { + throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)"); + } + for (ExportJob exportJob : matchExportJobs) { + exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel"); + } + } + public void unprotectAddJob(ExportJob job) { idToJob.put(job.getId(), job); labelToJobId.putIfAbsent(job.getLabel(), job.getId()); } + private List getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException { + Predicate jobFilter = buildCancelJobFilter(stmt); + readLock(); + try { + return getJobs().stream().filter(jobFilter).collect(Collectors.toList()); + } finally { + readUnlock(); + } + } + + @VisibleForTesting + public static Predicate buildCancelJobFilter(CancelExportStmt stmt) throws AnalysisException { + String label = stmt.getLabel(); + String state = stmt.getState(); + PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility()); + + return job -> { + boolean labelFilter = true; + boolean stateFilter = true; + if (StringUtils.isNotEmpty(label)) { + labelFilter = label.contains("%") ? matcher.match(job.getLabel()) : + job.getLabel().equalsIgnoreCase(label); + } + if (StringUtils.isNotEmpty(state)) { + stateFilter = job.getState().name().equalsIgnoreCase(state); + } + + if (stmt.getOperator() != null && CompoundPredicate.Operator.OR.equals(stmt.getOperator())) { + return labelFilter || stateFilter; + } + + return labelFilter && stateFilter; + }; + } + private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception { ExportJob job = new ExportJob(jobId); job.setJob(stmt); @@ -294,12 +351,12 @@ public class ExportMgr { } public void replayUpdateJobState(long jobId, ExportJob.JobState newState) { - writeLock(); + readLock(); try { ExportJob job = idToJob.get(jobId); job.updateState(newState, true); } finally { - writeUnlock(); + readUnlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index e6f5702f2e..11b45513d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -47,6 +47,7 @@ import org.apache.doris.analysis.BackupStmt; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelBackupStmt; +import org.apache.doris.analysis.CancelExportStmt; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CleanLabelStmt; import org.apache.doris.analysis.CreateCatalogStmt; @@ -185,6 +186,8 @@ public class DdlExecutor { } else { env.getLoadManager().createLoadJobFromStmt(loadStmt); } + } else if (ddlStmt instanceof CancelExportStmt) { + env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt); } else if (ddlStmt instanceof CancelLoadStmt) { env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt); } else if (ddlStmt instanceof CreateRoutineLoadStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 4f8084a272..8066e280a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -161,18 +161,17 @@ public class ExportExportingTask extends MasterTask { } } - // release snapshot - Status releaseSnapshotStatus = job.releaseSnapshotPaths(); - if (!releaseSnapshotStatus.ok()) { - // even if release snapshot failed, do nothing cancel this job. - // snapshot will be removed by GC thread on BE, finally. - LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), - releaseSnapshotStatus.getErrorMsg()); - } - if (job.updateState(ExportJob.JobState.FINISHED)) { LOG.warn("export job success. job: {}", job); registerProfile(); + // release snapshot + Status releaseSnapshotStatus = job.releaseSnapshotPaths(); + if (!releaseSnapshotStatus.ok()) { + // even if release snapshot failed, do not cancel this job. + // snapshot will be removed by GC thread on BE, finally. + LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), + releaseSnapshotStatus.getErrorMsg()); + } } synchronized (this) { @@ -336,12 +335,9 @@ public class ExportExportingTask extends MasterTask { } } - if (!failed) { - exportedFiles.clear(); - job.addExportedFiles(newFiles); - ClientPool.brokerPool.returnObject(address, client); - } - + exportedFiles.clear(); + job.addExportedFiles(newFiles); + ClientPool.brokerPool.returnObject(address, client); return Status.OK; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java new file mode 100644 index 0000000000..30be49e031 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.load.ExportJob; +import org.apache.doris.load.ExportMgr; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Lists; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.wildfly.common.Assert; + +import java.util.List; +import java.util.function.Predicate; + +public class CancelExportStmtTest extends TestWithFeService { + + private Analyzer analyzer; + private String dbName = "testDb"; + private String tblName = "table1"; + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase(dbName); + useDatabase(dbName); + createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n" + + "properties(\"replication_num\" = \"1\");"); + analyzer = new Analyzer(connectContext.getEnv(), connectContext); + } + + @Test + public void testNormal() throws UserException { + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral = new StringLiteral("doris_test_label"); + + SlotRef stateSlotRef = new SlotRef(null, "state"); + + BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, + labelStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, labelBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label'", + stmt.toString()); + + SlotRef labelSlotRefUpper = new SlotRef(null, "LABEL"); + BinaryPredicate labelBinaryPredicateUpper = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRefUpper, + labelStringLiteral); + CancelExportStmt stmtUpper = new CancelExportStmt(null, labelBinaryPredicateUpper); + stmtUpper.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `LABEL` = 'doris_test_label'", + stmtUpper.toString()); + + StringLiteral stateStringLiteral = new StringLiteral("PENDING"); + BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, + stateStringLiteral); + stmt = new CancelExportStmt(null, stateBinaryPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `state` = 'PENDING'", stmt.toString()); + + LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, + labelStringLiteral); + stmt = new CancelExportStmt(null, labelLikePredicate); + stmt.analyze(analyzer); + Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'", + stmt.toString()); + + CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate, + stateBinaryPredicate); + stmt = new CancelExportStmt(null, compoundAndPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals( + "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'PENDING'", + stmt.toString()); + + CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate, + stateBinaryPredicate); + stmt = new CancelExportStmt(null, compoundOrPredicate); + stmt.analyze(analyzer); + Assertions.assertEquals( + "CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'PENDING'", + stmt.toString()); + } + + @Test + public void testError1() { + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("FINISHED"); + + LikePredicate stateLikePredicate = + new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, stateLikePredicate); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like", + () -> stmt.analyze(analyzer)); + } + + @Test + public void testError2() { + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral1 = new StringLiteral("EXPORTING"); + BinaryPredicate stateEqPredicate1 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral1); + + StringLiteral stateStringLiteral2 = new StringLiteral("PENDING"); + BinaryPredicate stateEqPredicate2 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral2); + + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral1 = new StringLiteral("test_label"); + BinaryPredicate labelEqPredicate1 = + new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral1); + + CompoundPredicate compoundAndPredicate1 = new CompoundPredicate(Operator.AND, stateEqPredicate1, + stateEqPredicate2); + CompoundPredicate compoundAndPredicate2 = new CompoundPredicate(Operator.AND, compoundAndPredicate1, + labelEqPredicate1); + CompoundPredicate compoundAndPredicate3 = new CompoundPredicate(Operator.NOT, stateEqPredicate1, null); + + + CancelExportStmt stmt1 = new CancelExportStmt(null, compoundAndPredicate2); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support nested clause", + () -> stmt1.analyze(analyzer)); + + + CancelExportStmt stmt2 = new CancelExportStmt(null, compoundAndPredicate3); + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support NOT operator", + () -> stmt2.analyze(analyzer)); + } + + @Test + public void testCancelJobFilter() throws UserException { + List exportJobList1 = Lists.newLinkedList(); + List exportJobList2 = Lists.newLinkedList(); + ExportJob job1 = new ExportJob(); + ExportJob job2 = new ExportJob(); + job2.updateState(ExportJob.JobState.CANCELLED, true); + ExportJob job3 = new ExportJob(); + job3.updateState(ExportJob.JobState.EXPORTING, true); + ExportJob job4 = new ExportJob(); + exportJobList1.add(job1); + exportJobList1.add(job2); + exportJobList1.add(job3); + exportJobList1.add(job4); + exportJobList2.add(job1); + exportJobList2.add(job2); + + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("PENDING"); + BinaryPredicate stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + Predicate filter = ExportMgr.buildCancelJobFilter(stmt); + + Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 2); + Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1); + + stateStringLiteral = new StringLiteral("EXPORTING"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + filter = ExportMgr.buildCancelJobFilter(stmt); + + Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1); + + } + +}