diff --git a/build.sh b/build.sh old mode 100755 new mode 100644 diff --git a/docker/Dockerfile b/docker/Dockerfile index 0151d897b1..7fa77daf3c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,9 +21,6 @@ MAINTAINER tangxiaoqing214445 ENV DEFAULT_DIR /var/local -# change .bashrc -RUN echo -e "if [ ! -d "/var/local/incubator-doris/thirdparty/installed" ]; then\n\tmkdir /var/local/incubator-doris/thirdparty/installed\n\tcp -rf /var/local/thirdparty/installed/* /var/local/incubator-doris/thirdparty/installed/\n\tln -s /var/local/incubator-doris/thirdparty/installed/bin/thrift /usr/bin/thrift \nfi" >> /root/.bashrc - ARG GCC_VERSION=7.3.0 ARG GCC_URL=https://mirrors.ustc.edu.cn/gnu/gcc/gcc-${GCC_VERSION} @@ -75,7 +72,8 @@ RUN touch ${DEFAULT_DIR}/install_jdk.sh \ && echo 'echo "export JAVA_HOME=/usr/java/jdk" >> /root/.bashrc' >> ${DEFAULT_DIR}/install_jdk.sh \ && chmod 777 ${DEFAULT_DIR}/install_jdk.sh \ && /bin/bash ${DEFAULT_DIR}/install_jdk.sh \ - && rm -rf *.rpm + && rm -rf *.rpm \ + && rm ${DEFAULT_DIR}/install_jdk.sh ENV JAVA_HOME /usr/java/jdk @@ -110,7 +108,8 @@ RUN git clone https://github.com/apache/incubator-doris.git \ && rm -rf ${DEFAULT_DIR}/doris-thirdparty.tar.gz \ && rm -rf ${DEFAULT_DIR}/doris-thirdparty \ && mkdir -p ${DEFAULT_DIR}/thirdparty \ - && mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/ - -WORKDIR ${DEFAULT_DIR}/incubator-doris + && mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/ \ + && rm -rf ${DEFAULT_DIR}/incubator-doris +ENV DORIS_THIRDPARTY /var/local/thirdparty +WORKDIR /root diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index fa115f00b9..e84fdc34cc 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -84,7 +84,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; - private static final String ENDPOINT_REGEX = "([a-z]+\\.*)+:[0-9]+"; + private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; private static final String EMPTY_STRING = ""; private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3b22d891c1..cd32da0393 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -199,7 +199,7 @@ public class RoutineLoadManager { Optional optional = routineLoadJobList.parallelStream() .filter(entity -> entity.getName().equals(name)) .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (!optional.isPresent()) { + if (optional.isPresent()) { return true; } } diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java new file mode 100644 index 0000000000..6b210b1d9c --- /dev/null +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.LoadDataSourceType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class CreateRoutineLoadStmtTest { + + private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmtTest.class); + + @Test + public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) throws UserException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + String kafkaPartitionString = "1,2,3"; + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + PartitionNames partitionNames = new PartitionNames(partitionNameString); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + + // duplicate load property + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + new MockUp() { + @Mock + public void analyze(Analyzer analyzer1) { + return; + } + }; + + try { + createRoutineLoadStmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + String topicName = "topic1"; + String serverAddress = "127.0.0.1:8080"; + String kafkaPartitionString = "1,2,3"; + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + PartitionNames partitionNames = new PartitionNames(partitionNameString); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + + // duplicate load property + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(partitionNames); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + new MockUp() { + @Mock + public void analyze(Analyzer analyzer1) { + return; + } + }; + + createRoutineLoadStmt.analyze(analyzer); + + Assert.assertNotNull(createRoutineLoadStmt.getRoutineLoadDesc()); + Assert.assertEquals(columnSeparator, createRoutineLoadStmt.getRoutineLoadDesc().getColumnSeparator()); + Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames()); + Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum()); + Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum()); + Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint()); + Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic()); + Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions())); + } + +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 3d3dc20e0f..272a1663a0 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,32 +17,9 @@ package org.apache.doris.load.routineload; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Database; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.SystemIdGenerator; -import org.apache.doris.load.RoutineLoadDesc; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.transaction.BeginTransactionException; -import org.apache.doris.transaction.GlobalTransactionMgr; -import org.apache.doris.transaction.TransactionState; - +import com.google.common.base.Joiner; import com.google.common.collect.Lists; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - +import com.google.common.collect.Maps; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; @@ -50,16 +27,71 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; import mockit.Verifications; +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.ParseNode; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.LoadException; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TransactionState; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; public class KafkaRoutineLoadJobTest { + private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class); + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; + private String jobName = "job1"; + private String dbName = "db1"; + private String tableNameString = "table1"; + private String topicName = "topic1"; + private String serverAddress = "http://127.0.0.1:8080"; + private String kafkaPartitionString = "1,2,3"; + + private PartitionNames partitionNames; + + private ColumnSeparator columnSeparator = new ColumnSeparator(","); + @Mocked ConnectContext connectContext; @Mocked TResourceInfo tResourceInfo; + @Before + public void init() { + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + partitionNames = new PartitionNames(partitionNameString); + } + @Test public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Injectable PartitionInfo partitionInfo1, @@ -96,7 +128,7 @@ public class KafkaRoutineLoadJobTest { KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc ,3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum()); } @@ -109,7 +141,7 @@ public class KafkaRoutineLoadJobTest { @Mocked RoutineLoadDesc routineLoadDesc) throws BeginTransactionException, LabelAlreadyUsedException, AnalysisException { - new Expectations(){ + new Expectations() { { connectContext.toResourceCtx(); result = tResourceInfo; @@ -118,7 +150,7 @@ public class KafkaRoutineLoadJobTest { KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc , 3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); new Expectations() { @@ -158,7 +190,7 @@ public class KafkaRoutineLoadJobTest { throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { - new Expectations(){ + new Expectations() { { connectContext.toResourceCtx(); result = tResourceInfo; @@ -167,7 +199,7 @@ public class KafkaRoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc ,3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); new Expectations() { { @@ -217,4 +249,120 @@ public class KafkaRoutineLoadJobTest { } }; } + + @Test + public void testFromCreateStmtWithErrorPartition(@Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = table; + table.getPartition("p1"); + result = null; + table.getType(); + result = Table.TableType.OLAP; + } + }; + + try { + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, + @Injectable Database database) throws LoadException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = null; + } + }; + + try { + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testFromCreateStmt(@Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException, AnalysisException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + List kafkaIntegerList = Lists.newArrayList(); + for (String s : kafkaPartitionString.split(",")) { + kafkaIntegerList.add(Integer.valueOf(s)); + } + Deencapsulation.setField(createRoutineLoadStmt, "kafkaPartitions", kafkaIntegerList); + Deencapsulation.setField(createRoutineLoadStmt, "kafkaEndpoint", serverAddress); + Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", topicName); + long dbId = 1l; + long tableId = 2L; + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = table; + database.getId(); + result = dbId; + table.getId(); + result = tableId; + table.getType(); + result = Table.TableType.OLAP; + } + }; + + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName()); + Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId()); + Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId()); + Assert.assertEquals(serverAddress, Deencapsulation.getField(kafkaRoutineLoadJob, "serverAddress")); + Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); + List kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "kafkaPartitions"); + Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); + Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); + } + + private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(partitionNames); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + return createRoutineLoadStmt; + } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 49f698e898..7f09a84d0c 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -18,31 +18,216 @@ package org.apache.doris.load.routineload; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import mockit.Deencapsulation; import mockit.Expectations; +import mockit.Injectable; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import mockit.Verifications; +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.ParseNode; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; -import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.UUID; public class RoutineLoadManagerTest { + private static final Logger LOG = LogManager.getLogger(RoutineLoadManagerTest.class); + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; @Mocked private SystemInfoService systemInfoService; + @Test + public void testAddJobByStmt(@Injectable PaloAuth paloAuth, + @Injectable TResourceInfo tResourceInfo, + @Mocked ConnectContext connectContext, + @Mocked Catalog catalog) throws DdlException, LoadException, AnalysisException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + String topicName = "topic1"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + String serverAddress = "http://127.0.0.1:8080"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + + new MockUp() { + @Mock + public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { + return kafkaRoutineLoadJob; + } + }; + + new Expectations() { + { + catalog.getAuth(); + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD); + result = true; + } + }; + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt); + + Map idToRoutineLoadJob = + Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); + Assert.assertEquals(1, idToRoutineLoadJob.size()); + RoutineLoadJob routineLoadJob = idToRoutineLoadJob.values().iterator().next(); + Assert.assertEquals(1L, routineLoadJob.getDbId()); + Assert.assertEquals(jobName, routineLoadJob.getName()); + Assert.assertEquals(1L, routineLoadJob.getTableId()); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULER, routineLoadJob.getState()); + + Map>> dbToNameToRoutineLoadJob = + Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); + Assert.assertEquals(1, dbToNameToRoutineLoadJob.size()); + Assert.assertEquals(Long.valueOf(1L), dbToNameToRoutineLoadJob.keySet().iterator().next()); + Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(1L); + Assert.assertEquals(jobName, nameToRoutineLoadJob.keySet().iterator().next()); + Assert.assertEquals(1, nameToRoutineLoadJob.values().size()); + Assert.assertEquals(routineLoadJob, nameToRoutineLoadJob.values().iterator().next().get(0)); + + + } + + @Test + public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, + @Injectable TResourceInfo tResourceInfo, + @Mocked ConnectContext connectContext, + @Mocked Catalog catalog) { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + String topicName = "topic1"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + String serverAddress = "http://127.0.0.1:8080"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + + new Expectations() { + { + catalog.getAuth(); + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD); + result = false; + } + }; + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + try { + routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt); + Assert.fail(); + } catch (LoadException | DdlException e) { + Assert.fail(); + } catch (AnalysisException e) { + LOG.info("Access deny"); + } + } + + @Test + public void testCreateWithSameName(@Mocked ConnectContext connectContext) { + String jobName = "job1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + + Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); + Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); + List routineLoadJobList = Lists.newArrayList(); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); + nameToRoutineLoadJob.put(jobName, routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + try { + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob); + Assert.fail(); + } catch (DdlException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectContext) throws DdlException { + String jobName = "job1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + + Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); + Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); + List routineLoadJobList = Lists.newArrayList(); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED); + routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); + nameToRoutineLoadJob.put(jobName, routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + Map idToRoutineLoadJob = Maps.newConcurrentMap(); + idToRoutineLoadJob.put(UUID.randomUUID().toString(), kafkaRoutineLoadJobWithSameName); + + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob); + + Map>> result = + Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); + Map result1 = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals(Long.valueOf(1L), result.keySet().iterator().next()); + Map> resultNameToRoutineLoadJob = result.get(1L); + Assert.assertEquals(jobName, resultNameToRoutineLoadJob.keySet().iterator().next()); + Assert.assertEquals(2, resultNameToRoutineLoadJob.values().iterator().next().size()); + Assert.assertEquals(2, result1.values().size()); + } + @Test public void testGetMinTaskBeId() throws LoadException { List beIds = Lists.newArrayList();