Fix the error of duplicated label (#1303)
This commit is contained in:
@ -86,7 +86,7 @@ public class LoadManager implements Writable{
|
||||
LoadJob loadJob = null;
|
||||
writeLock();
|
||||
try {
|
||||
isLabelUsed(dbId, stmt.getLabel().getLabelName());
|
||||
checkLabelUsed(dbId, stmt.getLabel().getLabelName());
|
||||
if (stmt.getBrokerDesc() == null) {
|
||||
throw new DdlException("LoadManager only support the broker load.");
|
||||
}
|
||||
@ -101,6 +101,31 @@ public class LoadManager implements Writable{
|
||||
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will be invoked by load version1 which is used to check the label of v1 and v2 at the same time.
|
||||
* Step1: lock the load manager
|
||||
* Step2: check the label in load manager
|
||||
* Step3: call the addLoadJob of load class
|
||||
* Step3.1: lock the load
|
||||
* Step3.2: check the label in load
|
||||
* Step3.3: add the loadJob in load rather then load manager
|
||||
* Step3.4: unlock the load
|
||||
* Step4: unlock the load manager
|
||||
* @param stmt
|
||||
* @param timestamp
|
||||
* @throws DdlException
|
||||
*/
|
||||
public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException {
|
||||
Database database = checkDb(stmt.getLabel().getDbName());
|
||||
writeLock();
|
||||
try {
|
||||
checkLabelUsed(database.getId(), stmt.getLabel().getLabelName());
|
||||
Catalog.getCurrentCatalog().getLoadInstance().addLoadJob(stmt, jobType, timestamp);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void replayCreateLoadJob(LoadJob loadJob) {
|
||||
createLoadJob(loadJob);
|
||||
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
|
||||
@ -316,7 +341,7 @@ public class LoadManager implements Writable{
|
||||
* @param label
|
||||
* @throws DdlException throw exception when label has been used by an unfinished job.
|
||||
*/
|
||||
private void isLabelUsed(long dbId, String label)
|
||||
private void checkLabelUsed(long dbId, String label)
|
||||
throws DdlException {
|
||||
// if label has been used in old load jobs
|
||||
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
|
||||
|
||||
@ -119,7 +119,7 @@ public class DdlExecutor {
|
||||
if (loadStmt.getVersion().equals(LoadManager.VERSION)) {
|
||||
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
|
||||
} else {
|
||||
catalog.getLoadInstance().addLoadJob(loadStmt, jobType, System.currentTimeMillis());
|
||||
catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis());
|
||||
}
|
||||
} else if (ddlStmt instanceof CancelLoadStmt) {
|
||||
if (catalog.getLoadInstance().isLabelExist(
|
||||
|
||||
@ -18,7 +18,22 @@
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import mockit.Deencapsulation;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mocked;
|
||||
|
||||
import org.apache.doris.analysis.LabelName;
|
||||
import org.apache.doris.analysis.LoadStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.load.EtlJobType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -28,6 +43,7 @@ import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class LoadManagerTest {
|
||||
@ -41,6 +57,46 @@ public class LoadManagerTest {
|
||||
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateHadoopJob(@Mocked LoadJobScheduler loadJobScheduler,
|
||||
@Injectable LoadStmt stmt,
|
||||
@Injectable LabelName labelName,
|
||||
@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable BrokerLoadJob brokerLoadJob) {
|
||||
Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newHashMap();
|
||||
Map<String, List<LoadJob>> labelToLoadJobs = Maps.newHashMap();
|
||||
String label1 = "label1";
|
||||
List<LoadJob> loadJobs = Lists.newArrayList();
|
||||
loadJobs.add(brokerLoadJob);
|
||||
labelToLoadJobs.put(label1, loadJobs);
|
||||
dbIdToLabelToLoadJobs.put(1L, labelToLoadJobs);
|
||||
loadManager = new LoadManager(loadJobScheduler);
|
||||
Deencapsulation.setField(loadManager, "dbIdToLabelToLoadJobs", dbIdToLabelToLoadJobs);
|
||||
new Expectations() {
|
||||
{
|
||||
stmt.getLabel();
|
||||
result = labelName;
|
||||
labelName.getLabelName();
|
||||
result = "label1";
|
||||
catalog.getDb(anyString);
|
||||
result = database;
|
||||
database.getId();
|
||||
result = 1L;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
loadManager.createLoadJobV1FromStmt(stmt, EtlJobType.HADOOP, System.currentTimeMillis());
|
||||
Assert.fail("duplicated label is not be allowed");
|
||||
} catch (LabelAlreadyUsedException e) {
|
||||
// successful
|
||||
} catch (DdlException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerializationNormal() throws Exception {
|
||||
File file = serializeToFile(loadManager);
|
||||
|
||||
Reference in New Issue
Block a user