[RoutineLoad] Support modify routine load job (#4158)

Support ALTER ROUTINE LOAD JOB stmt, for example:

```
alter routine load db1.label1
properties
(
"desired_concurrent_number"="3",
"max_batch_interval" = "5",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"timezone" = "+08:00"
)
```

Details can be found in `alter-routine-load.md`
This commit is contained in:
Mingyu Chen
2020-08-06 23:11:02 +08:00
committed by GitHub
parent c98b411500
commit 237c0807a4
24 changed files with 1316 additions and 157 deletions

View File

@ -405,6 +405,7 @@ module.exports = [
"LOAD",
"MINI LOAD",
"MULTI LOAD",
"alter-routine-load",
"PAUSE ROUTINE LOAD",
"RESTORE TABLET",
"RESUME ROUTINE LOAD",

View File

@ -418,6 +418,7 @@ module.exports = [
"LOAD",
"MINI LOAD",
"MULTI LOAD",
"alter-routine-load",
"PAUSE ROUTINE LOAD",
"RESUME ROUTINE LOAD",
"ROUTINE LOAD",

View File

@ -237,6 +237,10 @@ Specific commands and examples for viewing the **Task** status can be viewed wit
You can only view tasks that are currently running, and tasks that have ended and are not started cannot be viewed.
### Alter job
Users can modify jobs that have been created. Specific instructions can be viewed through the `HELP ALTER ROUTINE LOAD;` command. Or refer to [ALTER ROUTINE LOAD](../../sql-reference/sql-statements/Data Manipulation/alter-routine-load.md).
### Job Control
The user can control the stop, pause and restart of the job by the three commands `STOP/PAUSE/RESUME`. You can view help and examples with the three commands `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` and `HELP RESUME ROUTINE LOAD;`.

View File

@ -0,0 +1,111 @@
---
{
"title": "ALTER ROUTINE LOAD",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# ALTER ROUTINE LOAD
## description
This syntax is used to modify a routine import job that has been created.
Only jobs in the PAUSED state can be modified.
Syntax:
ALTER ROUTINE LOAD FOR [db.]job_name
[job_properties]
FROM data_source
[data_source_properties]
1. `[db.]job_name`
Specify the name of the job to be modified.
2. `job_properties`
Specify the job parameters that need to be modified. Currently only supports the modification of the following parameters:
1. `desired_concurrent_number`
2. `max_error_number`
3. `max_batch_interval`
4. `max_batch_rows`
5. `max_batch_size`
6. `jsonpaths`
7. `json_root`
8. `strip_outer_array`
9. `strict_mode`
10. `timezone`
3. `data_source`
The type of data source. Currently supported:
KAFKA
4. `data_source_properties`
The relevant attributes of the data source. Currently only supports:
1. `kafka_partitions`
2. `kafka_offsets`
3. Custom property, such as `property.group.id`
Notice:
1. `kafka_partitions` and `kafka_offsets` are used to modify the offset of the kafka partition to be consumed, and can only modify the currently consumed partition. Cannot add partition.
## example
1. Modify `desired_concurrent_number` to 1
```
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES
(
"desired_concurrent_number" = "1"
);
```
2. Modify `desired_concurrent_number` to 10, modify partition offset, and modify group id。
```
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES
(
"desired_concurrent_number" = "10"
)
FROM kafka
(
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "100, 200, 100",
"property.group.id" = "new_group"
);
```
## keyword
ALTER,ROUTINE,LOAD

View File

@ -238,6 +238,10 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
### 修改作业属性
用户可以修改已经创建的作业。具体说明可以通过 `HELP ALTER ROUTINE LOAD;` 命令查看。或参阅 [ALTER ROUTINE LOAD](../../sql-reference/sql-statements/Data Manipulation/alter-routine-load.md)。
### 作业控制
用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` 以及 `HELP RESUME ROUTINE LOAD;` 三个命令查看帮助和示例。

View File

@ -0,0 +1,115 @@
---
{
"title": "ALTER ROUTINE LOAD",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# ALTER ROUTINE LOAD
## description
该语法用于修改已经创建的例行导入作业。
只能修改处于 PAUSED 状态的作业。
语法:
ALTER ROUTINE LOAD FOR [db.]job_name
[job_properties]
FROM data_source
[data_source_properties]
1. `[db.]job_name`
指定要修改的作业名称。
2. `tbl_name`
指定需要导入的表的名称。
3. `job_properties`
指定需要修改的作业参数。目前仅支持如下参数的修改:
1. `desired_concurrent_number`
2. `max_error_number`
3. `max_batch_interval`
4. `max_batch_rows`
5. `max_batch_size`
6. `jsonpaths`
7. `json_root`
8. `strip_outer_array`
9. `strict_mode`
10. `timezone`
4. `data_source`
数据源的类型。当前支持:
KAFKA
5. `data_source_properties`
数据源的相关属性。目前仅支持:
1. `kafka_partitions`
2. `kafka_offsets`
3. 自定义 property,如 `property.group.id`
注:
1. `kafka_partitions``kafka_offsets` 用于修改待消费的 kafka partition 的offset,仅能修改当前已经消费的 partition。不能新增 partition。
## example
1.`desired_concurrent_number` 修改为 1
```
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES
(
"desired_concurrent_number" = "1"
);
```
2. 将 `desired_concurrent_number` 修改为 10,修改 partition 的offset,修改 group id。
```
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES
(
"desired_concurrent_number" = "10"
)
FROM kafka
(
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "100, 200, 100",
"property.group.id" = "new_group"
);
```
## keyword
ALTER,ROUTINE,LOAD

View File

@ -473,11 +473,13 @@ nonterminal Boolean opt_verbose;
nonterminal Boolean opt_tmp;
nonterminal OutFileClause opt_outfile;
nonterminal RoutineLoadDataSourceProperties opt_datasource_properties;
precedence nonassoc COMMA;
precedence nonassoc STRING_LITERAL;
precedence nonassoc KW_COLUMNS;
precedence nonassoc KW_WITH;
precedence left KW_FULL, KW_MERGE;
precedence left DOT;
precedence left SET_VAR;
@ -728,11 +730,26 @@ alter_stmt ::=
{:
RESULT = new AlterDatabaseQuotaStmt(dbName, QuotaType.REPLICA, String.valueOf(number));
:}
| KW_ALTER KW_DATABASE ident:dbName KW_RENAME ident:newDbName
{:
RESULT = new AlterDatabaseRename(dbName, newDbName);
:}
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
opt_datasource_properties:datasourceProperties
{:
RESULT = new AlterRoutineLoadStmt(jobLabel, jobProperties, datasourceProperties);
:}
;
opt_datasource_properties ::=
// empty
{:
RESULT = new RoutineLoadDataSourceProperties();
:}
| KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
RESULT = new RoutineLoadDataSourceProperties(type, customProperties);
:}
;
quantity ::=

View File

@ -0,0 +1,189 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.Optional;
/**
* ALTER ROUTINE LOAD db.label
* PROPERTIES(
* ...
* )
* FROM kafka (
* ...
* )
*/
public class AlterRoutineLoadStmt extends DdlStmt {
private static final String NAME_TYPE = "ROUTINE LOAD NAME";
private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)
.add(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)
.add(CreateRoutineLoadStmt.JSONPATHS)
.add(CreateRoutineLoadStmt.JSONROOT)
.add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
.add(LoadStmt.STRICT_MODE)
.add(LoadStmt.TIMEZONE)
.build();
private final LabelName labelName;
private final Map<String, String> jobProperties;
private final RoutineLoadDataSourceProperties dataSourceProperties;
// save analyzed job properties.
// analyzed data source properties are saved in dataSourceProperties.
private Map<String, String> analyzedJobProperties = Maps.newHashMap();
public AlterRoutineLoadStmt(LabelName labelName, Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
this.labelName = labelName;
this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap();
this.dataSourceProperties = dataSourceProperties;
}
public String getDbName() {
return labelName.getDbName();
}
public String getLabel() {
return labelName.getLabelName();
}
public Map<String, String> getAnalyzedJobProperties() {
return analyzedJobProperties;
}
public boolean hasDataSourceProperty() {
return dataSourceProperties.hasAnalyzedProperties();
}
public RoutineLoadDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
labelName.analyze(analyzer);
FeNameFormat.checkCommonName(NAME_TYPE, labelName.getLabelName());
// check routine load job properties include desired concurrent number etc.
checkJobProperties();
// check data source properties
checkDataSourceProperties();
if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) {
throw new AnalysisException("No properties are specified");
}
}
private void checkJobProperties() throws UserException {
Optional<String> optional = jobProperties.keySet().stream().filter(
entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid property");
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) {
long desiredConcurrentNum = ((Long) Util.getLongPropertyOrDefault(
jobProperties.get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY),
-1, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PRED,
CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY + " should > 0")).intValue();
analyzedJobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY,
String.valueOf(desiredConcurrentNum));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
long maxErrorNum = Util.getLongPropertyOrDefault(
jobProperties.get(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY),
-1, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PRED,
CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY + " should >= 0");
analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY,
String.valueOf(maxErrorNum));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) {
long maxBatchIntervalS = Util.getLongPropertyOrDefault(
jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY),
-1, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_PRED,
CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY + " should between 5 and 60");
analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY,
String.valueOf(maxBatchIntervalS));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) {
long maxBatchRows = Util.getLongPropertyOrDefault(
jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY),
-1, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PRED,
CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY + " should > 200000");
analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY,
String.valueOf(maxBatchRows));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) {
long maxBatchSizeBytes = Util.getLongPropertyOrDefault(
jobProperties.get(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY),
-1, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PRED,
CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB");
analyzedJobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY,
String.valueOf(maxBatchSizeBytes));
}
if (jobProperties.containsKey(LoadStmt.STRICT_MODE)) {
boolean strictMode = Boolean.valueOf(jobProperties.get(LoadStmt.STRICT_MODE));
analyzedJobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(strictMode));
}
if (jobProperties.containsKey(LoadStmt.TIMEZONE)) {
String timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.get(LoadStmt.TIMEZONE));
analyzedJobProperties.put(LoadStmt.TIMEZONE, timezone);
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.JSONPATHS)) {
analyzedJobProperties.put(CreateRoutineLoadStmt.JSONPATHS, jobProperties.get(CreateRoutineLoadStmt.JSONPATHS));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.JSONROOT)) {
analyzedJobProperties.put(CreateRoutineLoadStmt.JSONROOT, jobProperties.get(CreateRoutineLoadStmt.JSONROOT));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)) {
boolean stripOuterArray = Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY));
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY),
String.valueOf(stripOuterArray));
}
}
private void checkDataSourceProperties() throws AnalysisException {
dataSourceProperties.analyze();
}
}

View File

@ -17,10 +17,6 @@
package org.apache.doris.analysis;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
@ -34,6 +30,11 @@ import org.apache.doris.load.routineload.LoadDataSourceType;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -162,14 +163,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
// pair<partition id, offset>
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
//custom kafka property map<key, value>
// custom kafka property map<key, value>
private Map<String, String> customKafkaProperties = Maps.newHashMap();
private static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; };
private static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; };
private static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; };
private static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; };
private static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; };
public static final Predicate<Long> DESIRED_CONCURRENT_NUMBER_PRED = (v) -> { return v > 0L; };
public static final Predicate<Long> MAX_ERROR_NUMBER_PRED = (v) -> { return v >= 0L; };
public static final Predicate<Long> MAX_BATCH_INTERVAL_PRED = (v) -> { return v >= 5 && v <= 60; };
public static final Predicate<Long> MAX_BATCH_ROWS_PRED = (v) -> { return v >= 200000; };
public static final Predicate<Long> MAX_BATCH_SIZE_PRED = (v) -> { return v >= 100 * 1024 * 1024 && v <= 1024 * 1024 * 1024; };
public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNode> loadPropertyList,
Map<String, String> jobProperties,
@ -366,12 +367,12 @@ public class CreateRoutineLoadStmt extends DdlStmt {
format = jobProperties.get(FORMAT);
if (format != null) {
if (format.equalsIgnoreCase("csv")) {
format = "";// if it's not json, then it's mean csv and set empty
format = ""; // if it's not json, then it's mean csv and set empty
} else if (format.equalsIgnoreCase("json")) {
format = "json";
jsonPaths = jobProperties.get(JSONPATHS);
jsonRoot = jobProperties.get(JSONROOT);
stripOuterArray = Boolean.valueOf(jobProperties.get(STRIP_OUTER_ARRAY));
stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
} else {
throw new UserException("Format type is invalid. format=`" + format + "`");
}
@ -424,58 +425,74 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
// check partitions
final String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY);
String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY);
if (kafkaPartitionsString != null) {
kafkaPartitionsString.replaceAll(" ", "");
if (kafkaPartitionsString.isEmpty()) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string");
}
String[] kafkaPartionsStringList = kafkaPartitionsString.split(",");
for (String s : kafkaPartionsStringList) {
try {
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY),
KafkaProgress.OFFSET_END_VAL));
} catch (AnalysisException e) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY
+ " must be a number string with comma-separated");
}
}
analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets);
}
// check offset
final String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY);
String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY);
if (kafkaOffsetsString != null) {
kafkaOffsetsString.replaceAll(" ", "");
if (kafkaOffsetsString.isEmpty()) {
throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string");
}
String[] kafkaOffsetsStringList = kafkaOffsetsString.split(",");
if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) {
throw new AnalysisException("Partitions number should be equals to offsets number");
}
analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets);
}
for (int i = 0; i < kafkaOffsetsStringList.length; i++) {
// defined in librdkafka/rdkafkacpp.h
// OFFSET_BEGINNING: -2
// OFFSET_END: -1
try {
kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i],
KAFKA_OFFSETS_PROPERTY);
if (kafkaPartitionOffsets.get(i).second < 0) {
throw new AnalysisException("Cannot specify offset smaller than 0");
}
} catch (AnalysisException e) {
if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL;
} else {
throw e;
}
// check custom kafka property
analyzeCustomProperties(this.dataSourceProperties, this.customKafkaProperties);
}
public static void analyzeKafkaPartitionProperty(String kafkaPartitionsString,
List<Pair<Integer, Long>> kafkaPartitionOffsets) throws AnalysisException {
kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
if (kafkaPartitionsString.isEmpty()) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string");
}
String[] kafkaPartionsStringList = kafkaPartitionsString.split(",");
for (String s : kafkaPartionsStringList) {
try {
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY),
KafkaProgress.OFFSET_END_VAL));
} catch (AnalysisException e) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY
+ " must be a number string with comma-separated");
}
}
}
public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString,
List<Pair<Integer, Long>> kafkaPartitionOffsets) throws AnalysisException {
kafkaOffsetsString = kafkaOffsetsString.replaceAll(" ", "");
if (kafkaOffsetsString.isEmpty()) {
throw new AnalysisException(KAFKA_OFFSETS_PROPERTY + " could not be a empty string");
}
String[] kafkaOffsetsStringList = kafkaOffsetsString.split(",");
if (kafkaOffsetsStringList.length != kafkaPartitionOffsets.size()) {
throw new AnalysisException("Partitions number should be equals to offsets number");
}
for (int i = 0; i < kafkaOffsetsStringList.length; i++) {
// defined in librdkafka/rdkafkacpp.h
// OFFSET_BEGINNING: -2
// OFFSET_END: -1
try {
kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i],
KAFKA_OFFSETS_PROPERTY);
if (kafkaPartitionOffsets.get(i).second < 0) {
throw new AnalysisException("Can not specify offset smaller than 0");
}
} catch (AnalysisException e) {
if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL;
} else {
throw e;
}
}
}
// check custom kafka property
}
public static void analyzeCustomProperties(Map<String, String> dataSourceProperties,
Map<String, String> customKafkaProperties) throws AnalysisException {
for (Map.Entry<String, String> dataSourceProperty : dataSourceProperties.entrySet()) {
if (dataSourceProperty.getKey().startsWith("property.")) {
String propertyKey = dataSourceProperty.getKey();
@ -486,11 +503,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
}
customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue);
}
//can be extended in the future which other prefix
// can be extended in the future which other prefix
}
}
private int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException {
private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException {
if (valueString.isEmpty()) {
throw new AnalysisException(propertyName + " could not be a empty string");
}
@ -503,7 +520,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return value;
}
private long getLongValueFromString(String valueString, String propertyName) throws AnalysisException {
private static long getLongValueFromString(String valueString, String propertyName) throws AnalysisException {
if (valueString.isEmpty()) {
throw new AnalysisException(propertyName + " could not be a empty string");
}
@ -511,7 +528,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
try {
value = Long.valueOf(valueString);
} catch (NumberFormatException e) {
throw new AnalysisException(propertyName + " must be a integer");
throw new AnalysisException(propertyName + " must be a integer: " + valueString);
}
return value;
}

View File

@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.load.routineload.LoadDataSourceType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class RoutineLoadDataSourceProperties {
private static final ImmutableSet<String> CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
.build();
@SerializedName(value = "type")
private String type = "KAFKA";
// origin properties, no need to persist
private Map<String, String> properties = Maps.newHashMap();
@SerializedName(value = "kafkaPartitionOffsets")
private List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
@SerializedName(value = "customKafkaProperties")
private Map<String, String> customKafkaProperties = Maps.newHashMap();
public RoutineLoadDataSourceProperties() {
// empty
}
public RoutineLoadDataSourceProperties(String type, Map<String, String> properties) {
this.type = type.toUpperCase();
this.properties = properties;
}
public void analyze() throws AnalysisException {
checkDataSourceProperties();
}
public boolean hasAnalyzedProperties() {
return !kafkaPartitionOffsets.isEmpty() || !customKafkaProperties.isEmpty();
}
public String getType() {
return type;
}
public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
return kafkaPartitionOffsets;
}
public Map<String, String> getCustomKafkaProperties() {
return customKafkaProperties;
}
private void checkDataSourceProperties() throws AnalysisException {
LoadDataSourceType sourceType;
try {
sourceType = LoadDataSourceType.valueOf(type);
} catch (IllegalArgumentException e) {
throw new AnalysisException("routine load job does not support this type " + type);
}
switch (sourceType) {
case KAFKA:
checkKafkaProperties();
break;
default:
break;
}
}
private void checkKafkaProperties() throws AnalysisException {
Optional<String> optional = properties.keySet().stream().filter(
entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity)).filter(
entity -> !entity.startsWith("property.")).findFirst();
if (optional.isPresent()) {
throw new AnalysisException(optional.get() + " is invalid kafka custom property");
}
// check partitions
final String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
if (kafkaPartitionsString != null) {
if (!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
throw new AnalysisException("Partition and offset must be specified at the same time");
}
CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, kafkaPartitionOffsets);
} else {
if (properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
throw new AnalysisException("Missing kafka partition info");
}
}
// check offset
String kafkaOffsetsString = properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY);
if (kafkaOffsetsString != null) {
CreateRoutineLoadStmt.analyzeKafkaOffsetProperty(kafkaOffsetsString, kafkaPartitionOffsets);
}
// check custom properties
CreateRoutineLoadStmt.analyzeCustomProperties(properties, customKafkaProperties);
}
@Override
public String toString() {
if (!hasAnalyzedProperties()) {
return "empty";
}
StringBuilder sb = new StringBuilder();
sb.append("type: ").append(type);
sb.append(", kafka partition offsets: ").append(kafkaPartitionOffsets);
sb.append(", custome properties: ").append(customKafkaProperties);
return sb.toString();
}
}

View File

@ -26,9 +26,9 @@ import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSearchDesc;
import org.apache.doris.catalog.Resource;
import org.apache.doris.cluster.BaseParam;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.io.Text;
@ -41,11 +41,12 @@ import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BackendIdsUpdateInfo;
import org.apache.doris.persist.BackendTabletsInfo;
@ -575,6 +576,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ALTER_ROUTINE_LOAD_JOB: {
data = AlterRoutineLoadJobOperationLog.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.routineload;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
@ -105,6 +106,21 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
// modify the partition offset of this progess.
// throw exception is the specified partition does not exist in progress.
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
if (!partitionIdToOffset.containsKey(pair.first)) {
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
}
}
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
partitionIdToOffset.put(pair.first, pair.second);
}
}
@Override
public String toString() {
Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();
@ -147,4 +163,5 @@ public class KafkaProgress extends RoutineLoadProgress {
partitionIdToOffset.put(in.readInt(), in.readLong());
}
}
}

View File

@ -17,7 +17,9 @@
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
@ -38,6 +40,7 @@ import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.transaction.TransactionStatus;
@ -86,7 +89,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
public KafkaRoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, String brokerList, String topic) {
long dbId, long tableId, String brokerList, String topic) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
this.brokerList = brokerList;
this.topic = topic;
@ -105,23 +108,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return convertedCustomProperties;
}
public void resetConvertedCustomProperties() {
convertedCustomProperties.clear();
}
@Override
public void prepare() throws UserException {
super.prepare();
// should reset converted properties each time the job being prepared.
// because the file info can be changed anytime.
resetConvertedCustomProperties();
convertCustomProperties();
convertCustomProperties(true);
}
private void convertCustomProperties() throws DdlException {
if (!convertedCustomProperties.isEmpty() || customProperties.isEmpty()) {
private void convertCustomProperties(boolean rebuild) throws DdlException {
if (customProperties.isEmpty()) {
return;
}
if (!rebuild && !convertedCustomProperties.isEmpty()) {
return;
}
if (rebuild) {
convertedCustomProperties.clear();
}
SmallFileMgr smallFileMgr = Catalog.getCurrentCatalog().getSmallFileMgr();
for (Map.Entry<String, String> entry : customProperties.entrySet()) {
if (entry.getValue().startsWith("FILE:")) {
@ -184,10 +191,10 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
LOG.debug("current concurrent task number is min"
+ "(partition num: {}, desire task concurrent num: {}, alive be num: {}, config: {})",
+ "(partition num: {}, desire task concurrent num: {}, alive be num: {}, config: {})",
partitionNum, desireTaskConcurrentNum, aliveBeNum, Config.max_routine_load_task_concurrent_num);
currentTaskConcurrentNum = Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)),
Config.max_routine_load_task_concurrent_num);
Config.max_routine_load_task_concurrent_num);
return currentTaskConcurrentNum;
}
@ -202,20 +209,20 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// otherwise currentErrorNum and currentTotalNum is updated when progress is not updated
@Override
protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment,
TransactionStatus txnStatus) {
TransactionStatus txnStatus) {
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 && txnStatus == TransactionStatus.ABORTED) {
// case 1
return false;
}
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
&& (!((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).hasPartition())) {
// case 2
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
.add("progress_partition_offset_size", 0)
.add("msg", "commit attachment info is incorrect"));
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
.add("progress_partition_offset_size", 0)
.add("msg", "commit attachment info is incorrect"));
return false;
}
return true;
@ -269,12 +276,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage())
.build(), e);
.add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage())
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
"Job failed to fetch all current partition with error " + e.getMessage()),
"Job failed to fetch all current partition with error " + e.getMessage()),
false /* not replay */);
}
return false;
@ -284,9 +291,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
currentKafkaPartitions = newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
}
return true;
} else {
@ -296,9 +303,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
currentKafkaPartitions = newCurrentKafkaPartition;
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
}
return true;
}
@ -328,7 +335,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
private List<Integer> getAllKafkaPartitions() throws UserException {
convertCustomProperties();
convertCustomProperties(false);
return KafkaUtil.getAllKafkaPartitions(brokerList, topic, convertedCustomProperties);
}
@ -406,9 +413,9 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet));
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("kafka_partition_id", kafkaPartition)
.add("begin_offset", beginOffSet)
.add("msg", "The new partition has been added in job"));
.add("kafka_partition_id", kafkaPartition)
.add("begin_offset", beginOffSet)
.add("msg", "The new partition has been added in job"));
}
}
}
@ -437,6 +444,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private void setCustomKafkaProperties(Map<String, String> kafkaProperties) {
this.customProperties = kafkaProperties;
}
@Override
protected String dataSourcePropertiesJsonToString() {
Map<String, String> dataSourceProperties = Maps.newHashMap();
@ -484,7 +492,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_51) {
int count = in.readInt();
for (int i = 0 ;i < count ;i ++) {
for (int i = 0; i < count; i++) {
String propertyKey = Text.readString(in);
String propertyValue = Text.readString(in);
if (propertyKey.startsWith("property.")) {
@ -493,4 +501,68 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
}
@Override
public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException {
Map<String, String> jobProperties = stmt.getAnalyzedJobProperties();
RoutineLoadDataSourceProperties dataSourceProperties = stmt.getDataSourceProperties();
writeLock();
try {
if (getState() != JobState.PAUSED) {
throw new DdlException("Only supports modification of PAUSED jobs");
}
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
jobProperties, dataSourceProperties);
Catalog.getCurrentCatalog().getEditLog().logAlterRoutineLoadJob(log);
} finally {
writeUnlock();
}
}
private void modifyPropertiesInternal(Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties)
throws DdlException {
List<Pair<Integer, Long>> kafkaPartitionOffsets = Lists.newArrayList();
Map<String, String> customKafkaProperties = Maps.newHashMap();
if (dataSourceProperties.hasAnalyzedProperties()) {
kafkaPartitionOffsets = dataSourceProperties.getKafkaPartitionOffsets();
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
}
// modify partition offset first
if (!kafkaPartitionOffsets.isEmpty()) {
// we can only modify the partition that is being consumed
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}
if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
}
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",
this.id, jobProperties, dataSourceProperties);
}
@Override
public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) {
try {
modifyPropertiesInternal(log.getJobProperties(), log.getDataSourceProperties());
} catch (DdlException e) {
// should not happen
LOG.error("failed to replay modify kafka routine load job: {}", id, e);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
@ -48,6 +49,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
@ -1242,6 +1244,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows));
jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes));
jobProperties.put("currentTaskConcurrentNum", String.valueOf(currentTaskConcurrentNum));
jobProperties.put("desireTaskConcurrentNum", String.valueOf(desireTaskConcurrentNum));
jobProperties.putAll(this.jobProperties);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(jobProperties);
}
@ -1410,4 +1414,36 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e);
}
}
abstract public void modifyProperties(AlterRoutineLoadStmt stmt) throws DdlException;
abstract public void replayModifyProperties(AlterRoutineLoadJobOperationLog log);
// for ALTER ROUTINE LOAD
protected void modifyCommonJobProperties(Map<String, String> jobProperties) {
if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) {
this.desireTaskConcurrentNum = Integer.valueOf(
jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) {
this.maxErrorNum = Long.valueOf(
jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) {
this.maxBatchIntervalS = Long.valueOf(
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) {
this.maxBatchRows = Long.valueOf(
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
}
if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) {
this.maxBatchSizeBytes = Long.valueOf(
jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY));
}
}
}

View File

@ -17,12 +17,14 @@
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.ResumeRoutineLoadStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@ -35,9 +37,11 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.RoutineLoadOperation;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -201,11 +205,11 @@ public class RoutineLoadManager implements Writable {
return false;
}
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = getJob(pauseRoutineLoadStmt.getDbFullName(), pauseRoutineLoadStmt.getName());
private RoutineLoadJob checkPrivAndGetJob(String dbName, String jobName)
throws MetaNotFoundException, DdlException, AnalysisException {
RoutineLoadJob routineLoadJob = getJob(dbName, jobName);
if (routineLoadJob == null) {
throw new DdlException("There is not operable routine load job with name " + pauseRoutineLoadStmt.getName());
throw new DdlException("There is not operable routine load job with name " + jobName);
}
// check auth
String dbFullName;
@ -225,41 +229,27 @@ public class RoutineLoadManager implements Writable {
ConnectContext.get().getRemoteIP(),
tableName);
}
return routineLoadJob;
}
public void pauseRoutineLoadJob(PauseRoutineLoadStmt pauseRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(pauseRoutineLoadStmt.getDbFullName(),
pauseRoutineLoadStmt.getName());
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
.add("msg", "routine load job has been paused by user")
.build());
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId()).add("current_state",
routineLoadJob.getState()).add("user", ConnectContext.get().getQualifiedUser()).add("msg",
"routine load job has been paused by user").build());
}
public void resumeRoutineLoadJob(ResumeRoutineLoadStmt resumeRoutineLoadStmt) throws UserException {
RoutineLoadJob routineLoadJob = getJob(resumeRoutineLoadStmt.getDbFullName(), resumeRoutineLoadStmt.getName());
if (routineLoadJob == null) {
throw new DdlException("There is not operable routine load job with name " + resumeRoutineLoadStmt.getName() + ".");
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
tableName);
}
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(resumeRoutineLoadStmt.getDbFullName(),
resumeRoutineLoadStmt.getName());
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
@ -273,31 +263,12 @@ public class RoutineLoadManager implements Writable {
public void stopRoutineLoadJob(StopRoutineLoadStmt stopRoutineLoadStmt)
throws UserException {
RoutineLoadJob routineLoadJob = getJob(stopRoutineLoadStmt.getDbFullName(), stopRoutineLoadStmt.getName());
if (routineLoadJob == null) {
throw new DdlException("There is not operable routine load job with name " + stopRoutineLoadStmt.getName());
}
// check auth
String dbFullName;
String tableName;
try {
dbFullName = routineLoadJob.getDbFullName();
tableName = routineLoadJob.getTableName();
} catch (MetaNotFoundException e) {
throw new DdlException("The metadata of job has been changed. The job will be cancelled automatically", e);
}
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbFullName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
tableName);
}
RoutineLoadJob routineLoadJob = checkPrivAndGetJob(stopRoutineLoadStmt.getDbFullName(),
stopRoutineLoadStmt.getName());
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
false /* not replay */);
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
.add("user", ConnectContext.get().getQualifiedUser())
@ -602,6 +573,24 @@ public class RoutineLoadManager implements Writable {
.add("msg", "replay change routine load job")
.build());
}
/**
* Enter of altering a routine load job
*/
public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException {
RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel());
if (stmt.hasDataSourceProperty()
&& !stmt.getDataSourceProperties().getType().equalsIgnoreCase(job.dataSourceType.name())) {
throw new DdlException("The specified job type is not: " + stmt.getDataSourceProperties().getType());
}
job.modifyProperties(stmt);
}
public void replayAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) {
RoutineLoadJob job = getJob(log.getJobId());
Preconditions.checkNotNull(job, log.getJobId());
job.replayModifyProperties(log);
}
@Override
public void write(DataOutput out) throws IOException {

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
public class AlterRoutineLoadJobOperationLog implements Writable {
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "jobProperties")
private Map<String, String> jobProperties;
@SerializedName(value = "dataSourceProperties")
private RoutineLoadDataSourceProperties dataSourceProperties;
public AlterRoutineLoadJobOperationLog(long jobId, Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
this.jobId = jobId;
this.jobProperties = jobProperties;
this.dataSourceProperties = dataSourceProperties;
}
public long getJobId() {
return jobId;
}
public Map<String, String> getJobProperties() {
return jobProperties;
}
public RoutineLoadDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}
public static AlterRoutineLoadJobOperationLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterRoutineLoadJobOperationLog.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -784,6 +784,11 @@ public class EditLog {
}
break;
}
case OperationType.OP_ALTER_ROUTINE_LOAD_JOB: {
AlterRoutineLoadJobOperationLog log = (AlterRoutineLoadJobOperationLog) journal.getData();
catalog.getRoutineLoadManager().replayAlterRoutineLoadJob(log);
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1346,4 +1351,8 @@ public class EditLog {
public void logRemoveExpiredAlterJobV2(RemoveAlterJobV2OperationLog log) {
logEdit(OperationType.OP_REMOVE_ALTER_JOB_V2, log);
}
public void logAlterRoutineLoadJob(AlterRoutineLoadJobOperationLog log) {
logEdit(OperationType.OP_ALTER_ROUTINE_LOAD_JOB, log);
}
}

View File

@ -142,6 +142,7 @@ public class OperationType {
// routine load 110~120
public static final short OP_ROUTINE_LOAD_JOB = 110;
public static final short OP_ALTER_ROUTINE_LOAD_JOB = 111;
// UDF 130-140
public static final short OP_ADD_FUNCTION = 130;

View File

@ -21,9 +21,9 @@ import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.RollupJobV2;
import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
@ -349,4 +349,4 @@ public class GsonUtils {
}
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AlterClusterStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
@ -73,7 +74,6 @@ import org.apache.doris.analysis.SetUserPropertyStmt;
import org.apache.doris.analysis.StopRoutineLoadStmt;
import org.apache.doris.analysis.SyncStmt;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
@ -146,7 +146,9 @@ public class DdlExecutor {
catalog.getRoutineLoadManager().resumeRoutineLoadJob((ResumeRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof StopRoutineLoadStmt) {
catalog.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof DeleteStmt) {
} else if (ddlStmt instanceof AlterRoutineLoadStmt) {
catalog.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof DeleteStmt) {
catalog.getDeleteHandler().process((DeleteStmt) ddlStmt);
} else if (ddlStmt instanceof CreateUserStmt) {
CreateUserStmt stmt = (CreateUserStmt) ddlStmt;

View File

@ -0,0 +1,234 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import mockit.Expectations;
import mockit.Mocked;
/*
* Author: Chenmingyu
* Date: Jul 20, 2020
*/
public class AlterRoutineLoadStmtTest {
private Analyzer analyzer;
@Mocked
private PaloAuth auth;
@Before
public void setUp() {
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
new Expectations() {
{
auth.checkGlobalPriv((ConnectContext) any, (PrivPredicate) any);
minTimes = 0;
result = true;
auth.checkDbPriv((ConnectContext) any, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
auth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
}
};
}
@Test
public void testNormal() {
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, "200000");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("property.client.id", "101");
dataSourceProperties.put("property.group.id", "mygroup");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
} catch (UserException e) {
Assert.fail();
}
Assert.assertEquals(2, stmt.getAnalyzedJobProperties().size());
Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY));
Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY));
Assert.assertTrue(stmt.hasDataSourceProperty());
Assert.assertEquals(2, stmt.getDataSourceProperties().getCustomKafkaProperties().size());
Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id"));
Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("client.id"));
Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size());
}
}
@Test(expected = AnalysisException.class)
public void testNoPproperties() throws AnalysisException, UserException {
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
Maps.newHashMap(), new RoutineLoadDataSourceProperties());
stmt.analyze(analyzer);
}
@Test
public void testUnsupportedProperties() {
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.FORMAT, "csv");
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, new RoutineLoadDataSourceProperties());
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("format is invalid property"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka custom property"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Partition and offset must be specified at the same time"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("Missing kafka partition info"));
} catch (UserException e) {
Assert.fail();
}
}
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, "200000");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put("property.client.id", "101");
dataSourceProperties.put("property.group.id", "mygroup");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(
typeName, dataSourceProperties);
AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"),
jobProperties, routineLoadDataSourceProperties);
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("max_batch_size should between 100MB and 1GB"));
} catch (UserException e) {
Assert.fail();
}
}
}
}

View File

@ -867,4 +867,47 @@ public class RoutineLoadManagerTest {
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
}
@Test
public void testAlterRoutineLoadJob(@Injectable StopRoutineLoadStmt stopRoutineLoadStmt,
@Mocked Catalog catalog,
@Mocked Database database,
@Mocked PaloAuth paloAuth,
@Mocked ConnectContext connectContext) throws UserException {
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
Map<Long, Map<String, List<RoutineLoadJob>>> dbToNameToRoutineLoadJob = Maps.newHashMap();
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newHashMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
routineLoadJobList.add(routineLoadJob);
nameToRoutineLoadJob.put("", routineLoadJobList);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
new Expectations() {
{
stopRoutineLoadStmt.getDbFullName();
minTimes = 0;
result = "";
stopRoutineLoadStmt.getName();
minTimes = 0;
result = "";
catalog.getDb("");
minTimes = 0;
result = database;
database.getId();
minTimes = 0;
result = 1L;
catalog.getAuth();
minTimes = 0;
result = paloAuth;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, (PrivPredicate) any);
minTimes = 0;
result = true;
}
};
routineLoadManager.stopRoutineLoadJob(stopRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.STOPPED, routineLoadJob.getState());
}
}

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.RoutineLoadDataSourceProperties;
import org.apache.doris.common.AnalysisException;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Map;
public class AlterRoutineLoadOperationLogTest {
private static String fileName = "./AlterRoutineLoadOperationLogTest";
@Test
public void testSerialzeAlterViewInfo() throws IOException, AnalysisException {
// 1. Write objects to file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
long jobId = 1000;
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5");
String typeName = "kafka";
Map<String, String> dataSourceProperties = Maps.newHashMap();
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1");
dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000");
dataSourceProperties.put("property.group.id", "mygroup");
RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(typeName,
dataSourceProperties);
routineLoadDataSourceProperties.analyze();
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(jobId,
jobProperties, routineLoadDataSourceProperties);
log.write(out);
out.flush();
out.close();
// 2. Read objects from file
DataInputStream in = new DataInputStream(new FileInputStream(file));
AlterRoutineLoadJobOperationLog log2 = AlterRoutineLoadJobOperationLog.read(in);
Assert.assertEquals(1, log2.getJobProperties().size());
Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY));
Assert.assertEquals(1, log2.getDataSourceProperties().getCustomKafkaProperties().size());
Assert.assertEquals("mygroup", log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id"));
Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0),
log2.getDataSourceProperties().getKafkaPartitionOffsets().get(0));
Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(1),
log2.getDataSourceProperties().getKafkaPartitionOffsets().get(1));
in.close();
}
}

View File

@ -97,9 +97,9 @@ else
# eg:
# sh run-fe-ut.sh --run org.apache.doris.utframe.Demo
# sh run-fe-ut.sh --run org.apache.doris.utframe.Demo#testCreateDbAndTable+test2
${MVN_CMD} test -D test=$1
${MVN_CMD} test -DfailIfNoTests=false -D test=$1
else
echo "Run Frontend UT"
${MVN_CMD} test
${MVN_CMD} test -DfailIfNoTests=false
fi
fi