Add DORIS_THIRDPARTY env in docker image (#539)
* Add param of specified thirdparty path 1. The thirdparth path can be specify on build.sh: ./build.sh --thirdparty /specified/path/to/thirdparty 2. If there are only thirdparty param of build.sh, it will build both fe and be 3. Add unit test of routine load stmt 4. Remove source code in docker image * Add DORIS_THIRDPARTY env in docker image 1. Set DORIS_THIRDPARTY env in docker image. The build.sh will use /var/local/thirdparty instead of /source/code/thirdparty 2. remove --thirdparty param of build.sh * Change image workdir to /root
This commit is contained in:
@ -21,9 +21,6 @@ MAINTAINER tangxiaoqing214445
|
||||
|
||||
ENV DEFAULT_DIR /var/local
|
||||
|
||||
# change .bashrc
|
||||
RUN echo -e "if [ ! -d "/var/local/incubator-doris/thirdparty/installed" ]; then\n\tmkdir /var/local/incubator-doris/thirdparty/installed\n\tcp -rf /var/local/thirdparty/installed/* /var/local/incubator-doris/thirdparty/installed/\n\tln -s /var/local/incubator-doris/thirdparty/installed/bin/thrift /usr/bin/thrift \nfi" >> /root/.bashrc
|
||||
|
||||
ARG GCC_VERSION=7.3.0
|
||||
ARG GCC_URL=https://mirrors.ustc.edu.cn/gnu/gcc/gcc-${GCC_VERSION}
|
||||
|
||||
@ -75,7 +72,8 @@ RUN touch ${DEFAULT_DIR}/install_jdk.sh \
|
||||
&& echo 'echo "export JAVA_HOME=/usr/java/jdk" >> /root/.bashrc' >> ${DEFAULT_DIR}/install_jdk.sh \
|
||||
&& chmod 777 ${DEFAULT_DIR}/install_jdk.sh \
|
||||
&& /bin/bash ${DEFAULT_DIR}/install_jdk.sh \
|
||||
&& rm -rf *.rpm
|
||||
&& rm -rf *.rpm \
|
||||
&& rm ${DEFAULT_DIR}/install_jdk.sh
|
||||
|
||||
ENV JAVA_HOME /usr/java/jdk
|
||||
|
||||
@ -110,7 +108,8 @@ RUN git clone https://github.com/apache/incubator-doris.git \
|
||||
&& rm -rf ${DEFAULT_DIR}/doris-thirdparty.tar.gz \
|
||||
&& rm -rf ${DEFAULT_DIR}/doris-thirdparty \
|
||||
&& mkdir -p ${DEFAULT_DIR}/thirdparty \
|
||||
&& mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/
|
||||
|
||||
WORKDIR ${DEFAULT_DIR}/incubator-doris
|
||||
&& mv ${DEFAULT_DIR}/incubator-doris/thirdparty/installed ${DEFAULT_DIR}/thirdparty/ \
|
||||
&& rm -rf ${DEFAULT_DIR}/incubator-doris
|
||||
|
||||
ENV DORIS_THIRDPARTY /var/local/thirdparty
|
||||
WORKDIR /root
|
||||
|
||||
@ -84,7 +84,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
|
||||
|
||||
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
|
||||
private static final String ENDPOINT_REGEX = "([a-z]+\\.*)+:[0-9]+";
|
||||
private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
|
||||
private static final String EMPTY_STRING = "";
|
||||
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
|
||||
@ -199,7 +199,7 @@ public class RoutineLoadManager {
|
||||
Optional<RoutineLoadJob> optional = routineLoadJobList.parallelStream()
|
||||
.filter(entity -> entity.getName().equals(name))
|
||||
.filter(entity -> !entity.getState().isFinalState()).findFirst();
|
||||
if (!optional.isPresent()) {
|
||||
if (optional.isPresent()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,137 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.load.routineload.LoadDataSourceType;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class CreateRoutineLoadStmtTest {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(CreateRoutineLoadStmtTest.class);
|
||||
|
||||
@Test
|
||||
public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) throws UserException {
|
||||
String jobName = "job1";
|
||||
String dbName = "db1";
|
||||
String tableNameString = "table1";
|
||||
String topicName = "topic1";
|
||||
String serverAddress = "http://127.0.0.1:8080";
|
||||
String kafkaPartitionString = "1,2,3";
|
||||
List<String> partitionNameString = Lists.newArrayList();
|
||||
partitionNameString.add("p1");
|
||||
PartitionNames partitionNames = new PartitionNames(partitionNameString);
|
||||
ColumnSeparator columnSeparator = new ColumnSeparator(",");
|
||||
|
||||
// duplicate load property
|
||||
TableName tableName = new TableName(dbName, tableNameString);
|
||||
List<ParseNode> loadPropertyList = new ArrayList<>();
|
||||
loadPropertyList.add(columnSeparator);
|
||||
loadPropertyList.add(columnSeparator);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
|
||||
String typeName = LoadDataSourceType.KAFKA.name();
|
||||
Map<String, String> customProperties = Maps.newHashMap();
|
||||
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
|
||||
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
|
||||
loadPropertyList, properties,
|
||||
typeName, customProperties);
|
||||
|
||||
new MockUp<StatementBase>() {
|
||||
@Mock
|
||||
public void analyze(Analyzer analyzer1) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
createRoutineLoadStmt.analyze(analyzer);
|
||||
Assert.fail();
|
||||
} catch (AnalysisException e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAnalyze(@Injectable Analyzer analyzer) throws UserException {
|
||||
String jobName = "job1";
|
||||
String dbName = "db1";
|
||||
String tableNameString = "table1";
|
||||
String topicName = "topic1";
|
||||
String serverAddress = "127.0.0.1:8080";
|
||||
String kafkaPartitionString = "1,2,3";
|
||||
List<String> partitionNameString = Lists.newArrayList();
|
||||
partitionNameString.add("p1");
|
||||
PartitionNames partitionNames = new PartitionNames(partitionNameString);
|
||||
ColumnSeparator columnSeparator = new ColumnSeparator(",");
|
||||
|
||||
// duplicate load property
|
||||
TableName tableName = new TableName(dbName, tableNameString);
|
||||
List<ParseNode> loadPropertyList = new ArrayList<>();
|
||||
loadPropertyList.add(columnSeparator);
|
||||
loadPropertyList.add(partitionNames);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
|
||||
String typeName = LoadDataSourceType.KAFKA.name();
|
||||
Map<String, String> customProperties = Maps.newHashMap();
|
||||
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
|
||||
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
|
||||
loadPropertyList, properties,
|
||||
typeName, customProperties);
|
||||
new MockUp<StatementBase>() {
|
||||
@Mock
|
||||
public void analyze(Analyzer analyzer1) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
createRoutineLoadStmt.analyze(analyzer);
|
||||
|
||||
Assert.assertNotNull(createRoutineLoadStmt.getRoutineLoadDesc());
|
||||
Assert.assertEquals(columnSeparator, createRoutineLoadStmt.getRoutineLoadDesc().getColumnSeparator());
|
||||
Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames());
|
||||
Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum());
|
||||
Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum());
|
||||
Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaEndpoint());
|
||||
Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic());
|
||||
Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(createRoutineLoadStmt.getKafkaPartitions()));
|
||||
}
|
||||
|
||||
}
|
||||
@ -17,32 +17,9 @@
|
||||
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.SystemIdGenerator;
|
||||
import org.apache.doris.load.RoutineLoadDesc;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
import org.apache.doris.transaction.BeginTransactionException;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Deencapsulation;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
@ -50,16 +27,71 @@ import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import mockit.Verifications;
|
||||
import org.apache.doris.analysis.ColumnSeparator;
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.ParseNode;
|
||||
import org.apache.doris.analysis.PartitionNames;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.load.RoutineLoadDesc;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.SystemIdGenerator;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
import org.apache.doris.transaction.BeginTransactionException;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class KafkaRoutineLoadJobTest {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(KafkaRoutineLoadJobTest.class);
|
||||
|
||||
private static final int DEFAULT_TASK_TIMEOUT_SECONDS = 10;
|
||||
|
||||
private String jobName = "job1";
|
||||
private String dbName = "db1";
|
||||
private String tableNameString = "table1";
|
||||
private String topicName = "topic1";
|
||||
private String serverAddress = "http://127.0.0.1:8080";
|
||||
private String kafkaPartitionString = "1,2,3";
|
||||
|
||||
private PartitionNames partitionNames;
|
||||
|
||||
private ColumnSeparator columnSeparator = new ColumnSeparator(",");
|
||||
|
||||
@Mocked
|
||||
ConnectContext connectContext;
|
||||
@Mocked
|
||||
TResourceInfo tResourceInfo;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
List<String> partitionNameString = Lists.newArrayList();
|
||||
partitionNameString.add("p1");
|
||||
partitionNames = new PartitionNames(partitionNameString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBeNumMin(@Mocked KafkaConsumer kafkaConsumer,
|
||||
@Injectable PartitionInfo partitionInfo1,
|
||||
@ -96,7 +128,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob =
|
||||
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
|
||||
1L, routineLoadDesc ,3, 0,
|
||||
1L, routineLoadDesc, 3, 0,
|
||||
"", "", null);
|
||||
Assert.assertEquals(1, kafkaRoutineLoadJob.calculateCurrentConcurrentTaskNum());
|
||||
}
|
||||
@ -109,7 +141,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
@Mocked RoutineLoadDesc routineLoadDesc)
|
||||
throws BeginTransactionException, LabelAlreadyUsedException, AnalysisException {
|
||||
|
||||
new Expectations(){
|
||||
new Expectations() {
|
||||
{
|
||||
connectContext.toResourceCtx();
|
||||
result = tResourceInfo;
|
||||
@ -118,7 +150,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob =
|
||||
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
|
||||
1L, routineLoadDesc , 3, 0,
|
||||
1L, routineLoadDesc, 3, 0,
|
||||
"", "", null);
|
||||
|
||||
new Expectations() {
|
||||
@ -158,7 +190,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
throws AnalysisException, LabelAlreadyUsedException,
|
||||
BeginTransactionException {
|
||||
|
||||
new Expectations(){
|
||||
new Expectations() {
|
||||
{
|
||||
connectContext.toResourceCtx();
|
||||
result = tResourceInfo;
|
||||
@ -167,7 +199,7 @@ public class KafkaRoutineLoadJobTest {
|
||||
|
||||
RoutineLoadJob routineLoadJob =
|
||||
new KafkaRoutineLoadJob("1", "kafka_routine_load_job", 1L,
|
||||
1L, routineLoadDesc ,3, 0,
|
||||
1L, routineLoadDesc, 3, 0,
|
||||
"", "", null);
|
||||
new Expectations() {
|
||||
{
|
||||
@ -217,4 +249,120 @@ public class KafkaRoutineLoadJobTest {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromCreateStmtWithErrorPartition(@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable OlapTable table) throws LoadException {
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
|
||||
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames());
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getDb(dbName);
|
||||
result = database;
|
||||
database.getTable(tableNameString);
|
||||
result = table;
|
||||
table.getPartition("p1");
|
||||
result = null;
|
||||
table.getType();
|
||||
result = Table.TableType.OLAP;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
|
||||
Assert.fail();
|
||||
} catch (AnalysisException e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromCreateStmtWithErrorTable(@Mocked Catalog catalog,
|
||||
@Injectable Database database) throws LoadException {
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
|
||||
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames());
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getDb(dbName);
|
||||
result = database;
|
||||
database.getTable(tableNameString);
|
||||
result = null;
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
|
||||
Assert.fail();
|
||||
} catch (AnalysisException e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromCreateStmt(@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable OlapTable table) throws LoadException, AnalysisException {
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = initCreateRoutineLoadStmt();
|
||||
RoutineLoadDesc routineLoadDesc = new RoutineLoadDesc(columnSeparator, null, null, partitionNames.getPartitionNames());
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "routineLoadDesc", routineLoadDesc);
|
||||
List<Integer> kafkaIntegerList = Lists.newArrayList();
|
||||
for (String s : kafkaPartitionString.split(",")) {
|
||||
kafkaIntegerList.add(Integer.valueOf(s));
|
||||
}
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "kafkaPartitions", kafkaIntegerList);
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "kafkaEndpoint", serverAddress);
|
||||
Deencapsulation.setField(createRoutineLoadStmt, "kafkaTopic", topicName);
|
||||
long dbId = 1l;
|
||||
long tableId = 2L;
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getDb(dbName);
|
||||
result = database;
|
||||
database.getTable(tableNameString);
|
||||
result = table;
|
||||
database.getId();
|
||||
result = dbId;
|
||||
table.getId();
|
||||
result = tableId;
|
||||
table.getType();
|
||||
result = Table.TableType.OLAP;
|
||||
}
|
||||
};
|
||||
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = KafkaRoutineLoadJob.fromCreateStmt(createRoutineLoadStmt);
|
||||
Assert.assertEquals(jobName, kafkaRoutineLoadJob.getName());
|
||||
Assert.assertEquals(dbId, kafkaRoutineLoadJob.getDbId());
|
||||
Assert.assertEquals(tableId, kafkaRoutineLoadJob.getTableId());
|
||||
Assert.assertEquals(serverAddress, Deencapsulation.getField(kafkaRoutineLoadJob, "serverAddress"));
|
||||
Assert.assertEquals(topicName, Deencapsulation.getField(kafkaRoutineLoadJob, "topic"));
|
||||
List<Integer> kafkaPartitionResult = Deencapsulation.getField(kafkaRoutineLoadJob, "kafkaPartitions");
|
||||
Assert.assertEquals(kafkaPartitionString, Joiner.on(",").join(kafkaPartitionResult));
|
||||
Assert.assertEquals(routineLoadDesc, kafkaRoutineLoadJob.getRoutineLoadDesc());
|
||||
}
|
||||
|
||||
private CreateRoutineLoadStmt initCreateRoutineLoadStmt() {
|
||||
TableName tableName = new TableName(dbName, tableNameString);
|
||||
List<ParseNode> loadPropertyList = new ArrayList<>();
|
||||
loadPropertyList.add(columnSeparator);
|
||||
loadPropertyList.add(partitionNames);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
|
||||
String typeName = LoadDataSourceType.KAFKA.name();
|
||||
Map<String, String> customProperties = Maps.newHashMap();
|
||||
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString);
|
||||
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
|
||||
loadPropertyList, properties,
|
||||
typeName, customProperties);
|
||||
return createRoutineLoadStmt;
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,31 +18,216 @@
|
||||
package org.apache.doris.load.routineload;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Deencapsulation;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import mockit.Verifications;
|
||||
import org.apache.doris.analysis.ColumnSeparator;
|
||||
import org.apache.doris.analysis.CreateRoutineLoadStmt;
|
||||
import org.apache.doris.analysis.ParseNode;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.LoadException;
|
||||
import org.apache.doris.common.SystemIdGenerator;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TResourceInfo;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.UUID;
|
||||
|
||||
public class RoutineLoadManagerTest {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(RoutineLoadManagerTest.class);
|
||||
|
||||
private static final int DEFAULT_BE_CONCURRENT_TASK_NUM = 100;
|
||||
|
||||
@Mocked
|
||||
private SystemInfoService systemInfoService;
|
||||
|
||||
@Test
|
||||
public void testAddJobByStmt(@Injectable PaloAuth paloAuth,
|
||||
@Injectable TResourceInfo tResourceInfo,
|
||||
@Mocked ConnectContext connectContext,
|
||||
@Mocked Catalog catalog) throws DdlException, LoadException, AnalysisException {
|
||||
String jobName = "job1";
|
||||
String dbName = "db1";
|
||||
String tableNameString = "table1";
|
||||
TableName tableName = new TableName(dbName, tableNameString);
|
||||
List<ParseNode> loadPropertyList = new ArrayList<>();
|
||||
ColumnSeparator columnSeparator = new ColumnSeparator(",");
|
||||
loadPropertyList.add(columnSeparator);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
|
||||
String typeName = LoadDataSourceType.KAFKA.name();
|
||||
Map<String, String> customProperties = Maps.newHashMap();
|
||||
String topicName = "topic1";
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
|
||||
String serverAddress = "http://127.0.0.1:8080";
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
|
||||
loadPropertyList, properties,
|
||||
typeName, customProperties);
|
||||
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName);
|
||||
|
||||
|
||||
new MockUp<KafkaRoutineLoadJob>() {
|
||||
@Mock
|
||||
public KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) {
|
||||
return kafkaRoutineLoadJob;
|
||||
}
|
||||
};
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getAuth();
|
||||
result = paloAuth;
|
||||
paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD);
|
||||
result = true;
|
||||
}
|
||||
};
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt);
|
||||
|
||||
Map<String, RoutineLoadJob> idToRoutineLoadJob =
|
||||
Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob");
|
||||
Assert.assertEquals(1, idToRoutineLoadJob.size());
|
||||
RoutineLoadJob routineLoadJob = idToRoutineLoadJob.values().iterator().next();
|
||||
Assert.assertEquals(1L, routineLoadJob.getDbId());
|
||||
Assert.assertEquals(jobName, routineLoadJob.getName());
|
||||
Assert.assertEquals(1L, routineLoadJob.getTableId());
|
||||
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULER, routineLoadJob.getState());
|
||||
|
||||
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob =
|
||||
Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob");
|
||||
Assert.assertEquals(1, dbToNameToRoutineLoadJob.size());
|
||||
Assert.assertEquals(Long.valueOf(1L), dbToNameToRoutineLoadJob.keySet().iterator().next());
|
||||
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = dbToNameToRoutineLoadJob.get(1L);
|
||||
Assert.assertEquals(jobName, nameToRoutineLoadJob.keySet().iterator().next());
|
||||
Assert.assertEquals(1, nameToRoutineLoadJob.values().size());
|
||||
Assert.assertEquals(routineLoadJob, nameToRoutineLoadJob.values().iterator().next().get(0));
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateJobAuthDeny(@Injectable PaloAuth paloAuth,
|
||||
@Injectable TResourceInfo tResourceInfo,
|
||||
@Mocked ConnectContext connectContext,
|
||||
@Mocked Catalog catalog) {
|
||||
String jobName = "job1";
|
||||
String dbName = "db1";
|
||||
String tableNameString = "table1";
|
||||
TableName tableName = new TableName(dbName, tableNameString);
|
||||
List<ParseNode> loadPropertyList = new ArrayList<>();
|
||||
ColumnSeparator columnSeparator = new ColumnSeparator(",");
|
||||
loadPropertyList.add(columnSeparator);
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "2");
|
||||
String typeName = LoadDataSourceType.KAFKA.name();
|
||||
Map<String, String> customProperties = Maps.newHashMap();
|
||||
String topicName = "topic1";
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName);
|
||||
String serverAddress = "http://127.0.0.1:8080";
|
||||
customProperties.put(CreateRoutineLoadStmt.KAFKA_ENDPOINT_PROPERTY, serverAddress);
|
||||
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(jobName, tableName,
|
||||
loadPropertyList, properties,
|
||||
typeName, customProperties);
|
||||
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getAuth();
|
||||
result = paloAuth;
|
||||
paloAuth.checkTblPriv((ConnectContext) any, dbName, tableNameString, PrivPredicate.LOAD);
|
||||
result = false;
|
||||
}
|
||||
};
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
try {
|
||||
routineLoadManager.addRoutineLoadJob(createRoutineLoadStmt);
|
||||
Assert.fail();
|
||||
} catch (LoadException | DdlException e) {
|
||||
Assert.fail();
|
||||
} catch (AnalysisException e) {
|
||||
LOG.info("Access deny");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateWithSameName(@Mocked ConnectContext connectContext) {
|
||||
String jobName = "job1";
|
||||
String topicName = "topic1";
|
||||
String serverAddress = "http://127.0.0.1:8080";
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName);
|
||||
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
|
||||
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
|
||||
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
|
||||
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName);
|
||||
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
|
||||
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
|
||||
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
|
||||
|
||||
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
|
||||
try {
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob);
|
||||
Assert.fail();
|
||||
} catch (DdlException e) {
|
||||
LOG.info(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateWithSameNameOfStoppedJob(@Mocked ConnectContext connectContext) throws DdlException {
|
||||
String jobName = "job1";
|
||||
String topicName = "topic1";
|
||||
String serverAddress = "http://127.0.0.1:8080";
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName);
|
||||
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
|
||||
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newConcurrentMap();
|
||||
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
|
||||
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(jobName, 1L, 1L, serverAddress, topicName);
|
||||
Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
|
||||
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
|
||||
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
|
||||
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
|
||||
Map<String, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap();
|
||||
idToRoutineLoadJob.put(UUID.randomUUID().toString(), kafkaRoutineLoadJobWithSameName);
|
||||
|
||||
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
|
||||
Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob);
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob);
|
||||
|
||||
Map<Long, Map<String, List<RoutineLoadJob>>> result =
|
||||
Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob");
|
||||
Map<String, RoutineLoadJob> result1 = Deencapsulation.getField(routineLoadManager, "idToRoutineLoadJob");
|
||||
Assert.assertEquals(1, result.size());
|
||||
Assert.assertEquals(Long.valueOf(1L), result.keySet().iterator().next());
|
||||
Map<String, List<RoutineLoadJob>> resultNameToRoutineLoadJob = result.get(1L);
|
||||
Assert.assertEquals(jobName, resultNameToRoutineLoadJob.keySet().iterator().next());
|
||||
Assert.assertEquals(2, resultNameToRoutineLoadJob.values().iterator().next().size());
|
||||
Assert.assertEquals(2, result1.values().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMinTaskBeId() throws LoadException {
|
||||
List<Long> beIds = Lists.newArrayList();
|
||||
|
||||
Reference in New Issue
Block a user