diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index d1b2989d7a..13dc66eee7 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -86,7 +86,7 @@ public class LoadManager implements Writable{ LoadJob loadJob = null; writeLock(); try { - isLabelUsed(dbId, stmt.getLabel().getLabelName()); + checkLabelUsed(dbId, stmt.getLabel().getLabelName()); if (stmt.getBrokerDesc() == null) { throw new DdlException("LoadManager only support the broker load."); } @@ -101,6 +101,31 @@ public class LoadManager implements Writable{ Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); } + /** + * This method will be invoked by load version1 which is used to check the label of v1 and v2 at the same time. + * Step1: lock the load manager + * Step2: check the label in load manager + * Step3: call the addLoadJob of load class + * Step3.1: lock the load + * Step3.2: check the label in load + * Step3.3: add the loadJob in load rather then load manager + * Step3.4: unlock the load + * Step4: unlock the load manager + * @param stmt + * @param timestamp + * @throws DdlException + */ + public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException { + Database database = checkDb(stmt.getLabel().getDbName()); + writeLock(); + try { + checkLabelUsed(database.getId(), stmt.getLabel().getLabelName()); + Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt, jobType, timestamp); + } finally { + writeUnlock(); + } + } + public void replayCreateLoadJob(LoadJob loadJob) { createLoadJob(loadJob); LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()) @@ -316,7 +341,7 @@ public class LoadManager implements Writable{ * @param label * @throws DdlException throw exception when label has been used by an unfinished job. */ - private void isLabelUsed(long dbId, String label) + private void checkLabelUsed(long dbId, String label) throws DdlException { // if label has been used in old load jobs Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label); diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index 6f775e6d6c..72ee32dc45 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -119,7 +119,7 @@ public class DdlExecutor { if (loadStmt.getVersion().equals(LoadManager.VERSION)) { catalog.getLoadManager().createLoadJobFromStmt(loadStmt); } else { - catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis()); + catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); } } else if (ddlStmt instanceof CancelLoadStmt) { if (catalog.getLoadInstance().isLabelExist( diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index c4f795c849..a5986fbfc8 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -18,7 +18,22 @@ package org.apache.doris.load.loadv2; import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.load.EtlJobType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -28,6 +43,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.util.List; import java.util.Map; public class LoadManagerTest { @@ -41,6 +57,46 @@ public class LoadManagerTest { Deencapsulation.invoke(loadManager, "addLoadJob", job1); } + @Test + public void testCreateHadoopJob(@Mocked LoadJobScheduler loadJobScheduler, + @Injectable LoadStmt stmt, + @Injectable LabelName labelName, + @Mocked Catalog catalog, + @Injectable Database database, + @Injectable BrokerLoadJob brokerLoadJob) { + Map>> dbIdToLabelToLoadJobs = Maps.newHashMap(); + Map> labelToLoadJobs = Maps.newHashMap(); + String label1 = "label1"; + List loadJobs = Lists.newArrayList(); + loadJobs.add(brokerLoadJob); + labelToLoadJobs.put(label1, loadJobs); + dbIdToLabelToLoadJobs.put(1L, labelToLoadJobs); + loadManager = new LoadManager(loadJobScheduler); + Deencapsulation.setField(loadManager, "dbIdToLabelToLoadJobs", dbIdToLabelToLoadJobs); + new Expectations() { + { + stmt.getLabel(); + result = labelName; + labelName.getLabelName(); + result = "label1"; + catalog.getDb(anyString); + result = database; + database.getId(); + result = 1L; + } + }; + + try { + loadManager.createLoadJobV1FromStmt(stmt, EtlJobType.HADOOP, System.currentTimeMillis()); + Assert.fail("duplicated label is not be allowed"); + } catch (LabelAlreadyUsedException e) { + // successful + } catch (DdlException e) { + Assert.fail(e.getMessage()); + } + + } + @Test public void testSerializationNormal() throws Exception { File file = serializeToFile(loadManager);