From 6bef41633c5b2c7553adf538aeca2b0eb99763ba Mon Sep 17 00:00:00 2001 From: EmmyMiao87 <522274284@qq.com> Date: Thu, 17 Jan 2019 14:19:13 +0800 Subject: [PATCH] Add DORIS_THIRDPARTY env in docker image (#539) * Add param of specified thirdparty path 1. The thirdparth path can be specify on build.sh: ./build.sh --thirdparty /specified/path/to/thirdparty 2. If there are only thirdparty param of build.sh, it will build both fe and be 3. Add unit test of routine load stmt 4. Remove source code in docker image * Add DORIS_THIRDPARTY env in docker image 1. Set DORIS_THIRDPARTY env in docker image. The build.sh will use /var/local/thirdparty instead of /source/code/thirdparty 2. remove --thirdparty param of build.sh * Change image workdir to /root --- build.sh | 0 docker/Dockerfile | 13 +- .../doris/analysis/CreateRoutineLoadStmt.java | 2 +- .../load/routineload/RoutineLoadManager.java | 2 +- .../analysis/CreateRoutineLoadStmtTest.java | 137 ++++++++++++ .../routineload/KafkaRoutineLoadJobTest.java | 208 +++++++++++++++--- .../routineload/RoutineLoadManagerTest.java | 191 +++++++++++++++- 7 files changed, 511 insertions(+), 42 deletions(-) mode change 100755 => 100644 build.sh create mode 100644 fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java diff --git a/build.sh b/build.sh old mode 100755 new mode 100644 diff --git a/docker/Dockerfile b/docker/Dockerfile index 0151d897b1..7fa77daf3c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -21,9 +21,6 @@ MAINTAINER tangxiaoqing214445 ENV DEFAULT_DIR /var/local -# change .bashrc -RUN echo -e "if [ ! -d "/var/local/incubator-doris/thirdparty/installed" ]; then\n\tmkdir /var/local/incubator-doris/thirdparty/installed\n\tcp -rf /var/local/thirdparty/installed/* /var/local/incubator-doris/thirdparty/installed/\n\tln -s /var/local/incubator-doris/thirdparty/installed/bin/thrift /usr/bin/thrift \nfi" >> /root/.bashrc - ARG GCC_VERSION=7.3.0 ARG GCC_URL=https://mirrors.ustc.edu.cn/gnu/gcc/gcc-${GCC_VERSION} @@ -75,7 +72,8 @@ RUN touch ${DEFAULT_DIR}/install_jdk.sh \ && echo 'echo "export JAVA_HOME=/usr/java/jdk" >> /root/.bashrc' >> ${DEFAULT_DIR}/install_jdk.sh \ && chmod 777 ${DEFAULT_DIR}/install_jdk.sh \ && /bin/bash ${DEFAULT_DIR}/install_jdk.sh \ - && rm -rf *.rpm + && rm -rf *.rpm \ + && rm ${DEFAULT_DIR}/install_jdk.sh ENV JAVA_HOME /usr/java/jdk @@ -110,7 +108,8 @@ RUN git clone https://github.com/apache/incubator-doris.git \ && rm -rf ${DEFAULT_DIR}/doris-thirdparty.tar.gz \ && rm -rf ${DEFAULT_DIR}/doris-thirdparty \ && mkdir -p ${DEFAULT_DIR}/thirdparty \ - && mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/ - -WORKDIR ${DEFAULT_DIR}/incubator-doris + && mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/ \ + && rm -rf ${DEFAULT_DIR}/incubator-doris +ENV DORIS_THIRDPARTY /var/local/thirdparty +WORKDIR /root diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index fa115f00b9..e84fdc34cc 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -84,7 +84,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; private static final String NAME_TYPE = "ROUTINE LOAD NAME"; - private static final String ENDPOINT_REGEX = "([a-z]+\\.*)+:[0-9]+"; + private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; private static final String EMPTY_STRING = ""; private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 3b22d891c1..cd32da0393 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -199,7 +199,7 @@ public class RoutineLoadManager { Optional optional = routineLoadJobList.parallelStream() .filter(entity -> entity.getName().equals(name)) .filter(entity -> !entity.getState().isFinalState()).findFirst(); - if (!optional.isPresent()) { + if (optional.isPresent()) { return true; } } diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java new file mode 100644 index 0000000000..6b210b1d9c --- /dev/null +++ b/fe/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.LoadDataSourceType; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class CreateRoutineLoadStmtTest { + + private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmtTest.class); + + @Test + public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) throws UserException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + String kafkaPartitionString = "1,2,3"; + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + PartitionNames partitionNames = new PartitionNames(partitionNameString); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + + // duplicate load property + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + new MockUp() { + @Mock + public void analyze(Analyzer analyzer1) { + return; + } + }; + + try { + createRoutineLoadStmt.analyze(analyzer); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testAnalyze(@Injectable Analyzer analyzer) throws UserException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + String topicName = "topic1"; + String serverAddress = "127.0.0.1:8080"; + String kafkaPartitionString = "1,2,3"; + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + PartitionNames partitionNames = new PartitionNames(partitionNameString); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + + // duplicate load property + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(partitionNames); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + new MockUp() { + @Mock + public void analyze(Analyzer analyzer1) { + return; + } + }; + + createRoutineLoadStmt.analyze(analyzer); + + Assert.assertNotNull(createRoutineLoadStmt.getRoutineLoadDesc()); + Assert.assertEquals(columnSeparator, createRoutineLoadStmt.getRoutineLoadDesc().getColumnSeparator()); + Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames()); + Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum()); + Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum()); + Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint()); + Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic()); + Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions())); + } + +} diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 3d3dc20e0f..272a1663a0 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -17,32 +17,9 @@ package org.apache.doris.load.routineload; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Database; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.LabelAlreadyUsedException; -import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.SystemIdGenerator; -import org.apache.doris.load.RoutineLoadDesc; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TResourceInfo; -import org.apache.doris.transaction.BeginTransactionException; -import org.apache.doris.transaction.GlobalTransactionMgr; -import org.apache.doris.transaction.TransactionState; - +import com.google.common.base.Joiner; import com.google.common.collect.Lists; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.PartitionInfo; -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - +import com.google.common.collect.Maps; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Injectable; @@ -50,16 +27,71 @@ import mockit.Mock; import mockit.MockUp; import mockit.Mocked; import mockit.Verifications; +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.ParseNode; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.LoadException; +import org.apache.doris.load.RoutineLoadDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TransactionState; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; public class KafkaRoutineLoadJobTest { + private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class); + private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10; + private String jobName = "job1"; + private String dbName = "db1"; + private String tableNameString = "table1"; + private String topicName = "topic1"; + private String serverAddress = "http://127.0.0.1:8080"; + private String kafkaPartitionString = "1,2,3"; + + private PartitionNames partitionNames; + + private ColumnSeparator columnSeparator = new ColumnSeparator(","); + @Mocked ConnectContext connectContext; @Mocked TResourceInfo tResourceInfo; + @Before + public void init() { + List partitionNameString = Lists.newArrayList(); + partitionNameString.add("p1"); + partitionNames = new PartitionNames(partitionNameString); + } + @Test public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer, @Injectable PartitionInfo partitionInfo1, @@ -96,7 +128,7 @@ public class KafkaRoutineLoadJobTest { KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc ,3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum()); } @@ -109,7 +141,7 @@ public class KafkaRoutineLoadJobTest { @Mocked RoutineLoadDesc routineLoadDesc) throws BeginTransactionException, LabelAlreadyUsedException, AnalysisException { - new Expectations(){ + new Expectations() { { connectContext.toResourceCtx(); result = tResourceInfo; @@ -118,7 +150,7 @@ public class KafkaRoutineLoadJobTest { KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc , 3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); new Expectations() { @@ -158,7 +190,7 @@ public class KafkaRoutineLoadJobTest { throws AnalysisException, LabelAlreadyUsedException, BeginTransactionException { - new Expectations(){ + new Expectations() { { connectContext.toResourceCtx(); result = tResourceInfo; @@ -167,7 +199,7 @@ public class KafkaRoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L, - 1L, routineLoadDesc ,3, 0, + 1L, routineLoadDesc, 3, 0, "", "", null); new Expectations() { { @@ -217,4 +249,120 @@ public class KafkaRoutineLoadJobTest { } }; } + + @Test + public void testFromCreateStmtWithErrorPartition(@Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = table; + table.getPartition("p1"); + result = null; + table.getType(); + result = Table.TableType.OLAP; + } + }; + + try { + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog, + @Injectable Database database) throws LoadException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = null; + } + }; + + try { + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.fail(); + } catch (AnalysisException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testFromCreateStmt(@Mocked Catalog catalog, + @Injectable Database database, + @Injectable OlapTable table) throws LoadException, AnalysisException { + CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt(); + RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames()); + Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc); + List kafkaIntegerList = Lists.newArrayList(); + for (String s : kafkaPartitionString.split(",")) { + kafkaIntegerList.add(Integer.valueOf(s)); + } + Deencapsulation.setField(createRoutineLoadStmt, "kafkaPartitions", kafkaIntegerList); + Deencapsulation.setField(createRoutineLoadStmt, "kafkaEndpoint", serverAddress); + Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", topicName); + long dbId = 1l; + long tableId = 2L; + + new Expectations() { + { + catalog.getDb(dbName); + result = database; + database.getTable(tableNameString); + result = table; + database.getId(); + result = dbId; + table.getId(); + result = tableId; + table.getType(); + result = Table.TableType.OLAP; + } + }; + + KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt); + Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName()); + Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId()); + Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId()); + Assert.assertEquals(serverAddress, Deencapsulation.getField(kafkaRoutineLoadJob, "serverAddress")); + Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic")); + List kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "kafkaPartitions"); + Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult)); + Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc()); + } + + private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + loadPropertyList.add(columnSeparator); + loadPropertyList.add(partitionNames); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + return createRoutineLoadStmt; + } } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 49f698e898..7f09a84d0c 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -18,31 +18,216 @@ package org.apache.doris.load.routineload; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import mockit.Deencapsulation; import mockit.Expectations; +import mockit.Injectable; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import mockit.Verifications; +import org.apache.doris.analysis.ColumnSeparator; +import org.apache.doris.analysis.CreateRoutineLoadStmt; +import org.apache.doris.analysis.ParseNode; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; -import org.apache.doris.common.SystemIdGenerator; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TResourceInfo; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Queue; +import java.util.UUID; public class RoutineLoadManagerTest { + private static final Logger LOG = LogManager.getLogger(RoutineLoadManagerTest.class); + private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100; @Mocked private SystemInfoService systemInfoService; + @Test + public void testAddJobByStmt(@Injectable PaloAuth paloAuth, + @Injectable TResourceInfo tResourceInfo, + @Mocked ConnectContext connectContext, + @Mocked Catalog catalog) throws DdlException, LoadException, AnalysisException { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + String topicName = "topic1"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + String serverAddress = "http://127.0.0.1:8080"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + + new MockUp() { + @Mock + public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) { + return kafkaRoutineLoadJob; + } + }; + + new Expectations() { + { + catalog.getAuth(); + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD); + result = true; + } + }; + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt); + + Map idToRoutineLoadJob = + Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); + Assert.assertEquals(1, idToRoutineLoadJob.size()); + RoutineLoadJob routineLoadJob = idToRoutineLoadJob.values().iterator().next(); + Assert.assertEquals(1L, routineLoadJob.getDbId()); + Assert.assertEquals(jobName, routineLoadJob.getName()); + Assert.assertEquals(1L, routineLoadJob.getTableId()); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULER, routineLoadJob.getState()); + + Map>> dbToNameToRoutineLoadJob = + Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); + Assert.assertEquals(1, dbToNameToRoutineLoadJob.size()); + Assert.assertEquals(Long.valueOf(1L), dbToNameToRoutineLoadJob.keySet().iterator().next()); + Map> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(1L); + Assert.assertEquals(jobName, nameToRoutineLoadJob.keySet().iterator().next()); + Assert.assertEquals(1, nameToRoutineLoadJob.values().size()); + Assert.assertEquals(routineLoadJob, nameToRoutineLoadJob.values().iterator().next().get(0)); + + + } + + @Test + public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth, + @Injectable TResourceInfo tResourceInfo, + @Mocked ConnectContext connectContext, + @Mocked Catalog catalog) { + String jobName = "job1"; + String dbName = "db1"; + String tableNameString = "table1"; + TableName tableName = new TableName(dbName, tableNameString); + List loadPropertyList = new ArrayList<>(); + ColumnSeparator columnSeparator = new ColumnSeparator(","); + loadPropertyList.add(columnSeparator); + Map properties = Maps.newHashMap(); + properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2"); + String typeName = LoadDataSourceType.KAFKA.name(); + Map customProperties = Maps.newHashMap(); + String topicName = "topic1"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + String serverAddress = "http://127.0.0.1:8080"; + customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress); + CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName, + loadPropertyList, properties, + typeName, customProperties); + + + new Expectations() { + { + catalog.getAuth(); + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD); + result = false; + } + }; + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + try { + routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt); + Assert.fail(); + } catch (LoadException | DdlException e) { + Assert.fail(); + } catch (AnalysisException e) { + LOG.info("Access deny"); + } + } + + @Test + public void testCreateWithSameName(@Mocked ConnectContext connectContext) { + String jobName = "job1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + + Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); + Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); + List routineLoadJobList = Lists.newArrayList(); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); + nameToRoutineLoadJob.put(jobName, routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + try { + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob); + Assert.fail(); + } catch (DdlException e) { + LOG.info(e.getMessage()); + } + } + + @Test + public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectContext) throws DdlException { + String jobName = "job1"; + String topicName = "topic1"; + String serverAddress = "http://127.0.0.1:8080"; + KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + + Map>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap(); + Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); + List routineLoadJobList = Lists.newArrayList(); + KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName); + Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED); + routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); + nameToRoutineLoadJob.put(jobName, routineLoadJobList); + dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); + Map idToRoutineLoadJob = Maps.newConcurrentMap(); + idToRoutineLoadJob.put(UUID.randomUUID().toString(), kafkaRoutineLoadJobWithSameName); + + Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob); + + Map>> result = + Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); + Map result1 = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob"); + Assert.assertEquals(1, result.size()); + Assert.assertEquals(Long.valueOf(1L), result.keySet().iterator().next()); + Map> resultNameToRoutineLoadJob = result.get(1L); + Assert.assertEquals(jobName, resultNameToRoutineLoadJob.keySet().iterator().next()); + Assert.assertEquals(2, resultNameToRoutineLoadJob.values().iterator().next().size()); + Assert.assertEquals(2, result1.values().size()); + } + @Test public void testGetMinTaskBeId() throws LoadException { List beIds = Lists.newArrayList();