diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index 5d31053017..22adf3e4d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -45,6 +45,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; @@ -615,6 +616,9 @@ public class ExportJob implements Writable { setExportJobState(ExportJobState.CANCELLED); finishTimeMs = System.currentTimeMillis(); failMsg = new ExportFailMsg(type, msg); + if (FeConstants.runningUnitTest) { + return; + } Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index 71cb62120c..55fe2a283b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -200,4 +200,155 @@ public class CancelExportStmtTest extends TestWithFeService { } + @Test + public void testExportMgrCancelJob() throws UserException { + FeConstants.runningUnitTest = true; + ExportJob job1 = new ExportJob(); + job1.setId(1); + job1.setLabel("label_job1"); + ExportJob job2 = new ExportJob(); + job2.setId(2); + job2.setLabel("label_job2"); + ExportJob job3 = new ExportJob(); + job3.setId(3); + job3.setLabel("label_job3"); + ExportJob job4 = new ExportJob(); + job4.setId(4); + job4.setLabel("label_job4"); + + try { + Method setExportJobState = job1.getClass().getDeclaredMethod("setExportJobState", + ExportJobState.class); + setExportJobState.setAccessible(true); + // job1 is PENDING + setExportJobState.invoke(job2, ExportJobState.EXPORTING); + setExportJobState.invoke(job3, ExportJobState.FINISHED); + setExportJobState.invoke(job4, ExportJobState.CANCELLED); + } catch (Exception e) { + throw new UserException(e); + } + + ExportMgr exportMgr = new ExportMgr(); + exportMgr.unprotectAddJob(job1); + exportMgr.unprotectAddJob(job2); + exportMgr.unprotectAddJob(job3); + exportMgr.unprotectAddJob(job4); + + + // cancel export job where state = "PENDING" + Assert.assertTrue(job1.getState() == ExportJobState.PENDING); + SlotRef stateSlotRef = new SlotRef(null, "state"); + StringLiteral stateStringLiteral = new StringLiteral("PENDING"); + BinaryPredicate stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job1.getState() == ExportJobState.CANCELLED); + + // cancel export job where state = "EXPORTING" + Assert.assertTrue(job2.getState() == ExportJobState.EXPORTING); + stateStringLiteral = new StringLiteral("EXPORTING"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job2.getState() == ExportJobState.CANCELLED); + + // cancel export job where state = "FINISHED" + Assert.assertTrue(job3.getState() == ExportJobState.FINISHED); + stateStringLiteral = new StringLiteral("FINISHED"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + try { + stmt.analyze(analyzer); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("Only support PENDING/EXPORTING")); + } + + // cancel export job where state = "CANCELLED" + Assert.assertTrue(job4.getState() == ExportJobState.CANCELLED); + stateStringLiteral = new StringLiteral("CANCELLED"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + stmt = new CancelExportStmt(null, stateEqPredicate); + try { + stmt.analyze(analyzer); + } catch (AnalysisException e) { + Assert.assertTrue(e.getMessage().contains("Only support PENDING/EXPORTING")); + } + + ExportJob job5 = new ExportJob(); + job5.setId(5); + job5.setLabel("label_job5"); + exportMgr.unprotectAddJob(job5); + + ExportJob job6 = new ExportJob(); + job6.setId(6); + job6.setLabel("label_job6"); + exportMgr.unprotectAddJob(job6); + + ExportJob job7 = new ExportJob(); + job7.setId(7); + job7.setLabel("label_job7"); + exportMgr.unprotectAddJob(job7); + + ExportJob job8 = new ExportJob(); + job8.setId(8); + job8.setLabel("label_job8"); + exportMgr.unprotectAddJob(job8); + + // cancel export job where label = "label_job5" + Assert.assertTrue(job5.getState() == ExportJobState.PENDING); + SlotRef labelSlotRef = new SlotRef(null, "label"); + StringLiteral labelStringLiteral = new StringLiteral("label_job5"); + BinaryPredicate labelEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral); + stmt = new CancelExportStmt(null, labelEqPredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job5.getState() == ExportJobState.CANCELLED); + + // cancel export job where label like "job6" + Assert.assertTrue(job6.getState() == ExportJobState.PENDING); + labelSlotRef = new SlotRef(null, "label"); + labelStringLiteral = new StringLiteral("%job6"); + LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral); + stmt = new CancelExportStmt(null, likePredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job6.getState() == ExportJobState.CANCELLED); + + // cancel export job where label = "label_job7" AND STATE = "PENDING" + Assert.assertTrue(job7.getState() == ExportJobState.PENDING); + labelSlotRef = new SlotRef(null, "label"); + labelStringLiteral = new StringLiteral("label_job7"); + labelEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral); + stateStringLiteral = new StringLiteral("PENDING"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + CompoundPredicate andCompoundPredicate = new CompoundPredicate(Operator.AND, labelEqPredicate, stateEqPredicate); + stmt = new CancelExportStmt(null, andCompoundPredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job7.getState() == ExportJobState.CANCELLED); + + // cancel export job where label like "%job8" OR STATE = "PENDING" + Assert.assertTrue(job8.getState() == ExportJobState.PENDING); + labelSlotRef = new SlotRef(null, "label"); + labelStringLiteral = new StringLiteral("%job8"); + likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral); + stateStringLiteral = new StringLiteral("PENDING"); + stateEqPredicate = + new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral); + CompoundPredicate orCompoundPredicate = new CompoundPredicate(Operator.OR, likePredicate, stateEqPredicate); + stmt = new CancelExportStmt(null, orCompoundPredicate); + stmt.analyze(analyzer); + exportMgr.cancelExportJob(stmt); + Assert.assertTrue(job8.getState() == ExportJobState.CANCELLED); + } } +