[RoutineLoad] Support alter broker list and topic for kafka routine load (#6335)

```
alter routine load for cmy2 from kafka("kafka_broker_list" = "ip2:9094", "kafka_topic" = "my_topic");
```

This is useful when the kafka broker list or topic has been changed.

Also modify `show create routine load`, support showing  "kafka_partitions" and "kafka_offsets".
This commit is contained in:
Mingyu Chen
2021-08-03 11:58:38 +08:00
committed by GitHub
parent a2eb1b9d5b
commit 748604ff4f
14 changed files with 101 additions and 39 deletions

View File

@ -26,7 +26,10 @@ under the License.
# SHOW CREATE ROUTINE LOAD
## description
The statement is used to show the routine load job creation statement of user-defined
The statement is used to show the routine load job creation statement of user-defined.
The kafka partition and offset in the result show the currently consumed partition and the corresponding offset to be consumed.
grammar:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;
@ -39,4 +42,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load
## keyword
SHOW,CREATE,ROUTINE,LOAD
SHOW,CREATE,ROUTINE,LOAD

View File

@ -72,7 +72,9 @@ Syntax:
1. `kafka_partitions`
2. `kafka_offsets`
3. Custom property, such as `property.group.id`
3. `kafka_broker_list`
4. `kafka_topic`
5. Custom property, such as `property.group.id`
Notice:

View File

@ -26,7 +26,9 @@ under the License.
# SHOW CREATE ROUTINE LOAD
## description
该语句用于展示例行导入作业的创建语句
该语句用于展示例行导入作业的创建语句
结果中的 kafka partition 和 offset 展示的当前消费的 partition,以及对应的待消费的 offset。
语法:
SHOW [ALL] CREATE ROUTINE LOAD for load_name;
@ -39,4 +41,4 @@ under the License.
SHOW CREATE ROUTINE LOAD for test_load
## keyword
SHOW,CREATE,ROUTINE,LOAD
SHOW,CREATE,ROUTINE,LOAD

View File

@ -76,7 +76,9 @@ under the License.
1. `kafka_partitions`
2. `kafka_offsets`
3. 自定义 property,如 `property.group.id`
3. `kafka_broker_list`
4. `kafka_topic`
5. 自定义 property,如 `property.group.id`
注:

View File

@ -51,6 +51,8 @@ public class RoutineLoadDataSourceProperties {
.build();
private static final ImmutableSet<String> CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)
.add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)

View File

@ -34,6 +34,10 @@ public class Separator implements ParseNode {
this.separator = null;
}
public String getOriSeparator() {
return oriSeparator;
}
public String getSeparator() {
return separator;
}

View File

@ -27,9 +27,9 @@ public class ShowCreateRoutineLoadStmt extends ShowStmt {
private static final ShowResultSetMetaData META_DATA =
ShowResultSetMetaData.builder()
.addColumn(new Column("Routine Load Id", ScalarType.createVarchar(20)))
.addColumn(new Column("Routine Load Name", ScalarType.createVarchar(20)))
.addColumn(new Column("Create Routine Load", ScalarType.createVarchar(30)))
.addColumn(new Column("JobId", ScalarType.createVarchar(128)))
.addColumn(new Column("JobName", ScalarType.createVarchar(128)))
.addColumn(new Column("CreateStmt", ScalarType.createVarchar(65535)))
.build();
private final LabelName labelName;

View File

@ -22,13 +22,14 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TKafkaRLTaskProgress;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -128,6 +129,26 @@ public class KafkaProgress extends RoutineLoadProgress {
}
}
public List<Pair<Integer, String>> getPartitionOffsetPairs(boolean alreadyConsumed) {
List<Pair<Integer, String>> pairs = Lists.newArrayList();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (entry.getValue() == 0) {
pairs.add(Pair.create(entry.getKey(), OFFSET_ZERO));
} else if (entry.getValue() == -1) {
pairs.add(Pair.create(entry.getKey(), OFFSET_END));
} else if (entry.getValue() == -2) {
pairs.add(Pair.create(entry.getKey(), OFFSET_BEGINNING));
} else {
long offset = entry.getValue();
if (alreadyConsumed) {
offset -= 1;
}
pairs.add(Pair.create(entry.getKey(), "" + offset));
}
}
return pairs;
}
@Override
public String toString() {
Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();

View File

@ -55,15 +55,16 @@ import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.Strings;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.UUID;
@ -554,7 +555,6 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
throw new DdlException("Only supports modification of PAUSED jobs");
}
modifyPropertiesInternal(jobProperties, dataSourceProperties);
AlterRoutineLoadJobOperationLog log = new AlterRoutineLoadJobOperationLog(this.id,
@ -593,15 +593,23 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
}
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
}
if (!jobProperties.isEmpty()) {
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
modifyCommonJobProperties(copiedJobProperties);
this.jobProperties.putAll(copiedJobProperties);
}
if (!customKafkaProperties.isEmpty()) {
this.customProperties.putAll(customKafkaProperties);
convertCustomProperties(true);
// modify broker list and topic
if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) {
this.brokerList = dataSourceProperties.getKafkaBrokerList();
}
if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) {
this.topic = dataSourceProperties.getKafkaTopic();
}
LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}",

View File

@ -39,6 +39,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@ -1325,10 +1326,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// 4.load_properties
// 4.1.column_separator
if (columnSeparator != null) {
sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getSeparator()).append("\",\n");
sb.append("COLUMNS TERMINATED BY \"").append(columnSeparator.getOriSeparator()).append("\",\n");
}
// 4.2.columns_mapping
if (columnDescs != null) {
if (columnDescs != null && !columnDescs.descs.isEmpty()) {
sb.append("COLUMNS(").append(Joiner.on(",").join(columnDescs.descs)).append("),\n");
}
// 4.3.where_predicates
@ -1352,22 +1353,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
sb.append("PRECEDING FILTER ").append(precedingFilter.toSql()).append(",\n");
}
// remove the last ,
if (",".equals(sb.charAt(sb.length() - 2))) {
if (sb.charAt(sb.length() - 2) == ',') {
sb.replace(sb.length() - 2, sb.length() - 1, "");
}
// 5.job_properties
// 5.job_properties. See PROPERTIES_SET of CreateRoutineLoadStmt
sb.append("PROPERTIES\n(\n");
appendProperties(sb, CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, desireTaskConcurrentNum, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, maxBatchIntervalS, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, maxBatchRows, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, maxBatchSizeBytes, false);
appendProperties(sb, CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, maxErrorNum, false);
appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, PROPS_FORMAT, getFormat(), false);
appendProperties(sb, PROPS_JSONPATHS, getJsonPaths(), false);
appendProperties(sb, PROPS_STRIP_OUTER_ARRAY, isStripOuterArray(), false);
appendProperties(sb, PROPS_NUM_AS_STRING, isNumAsString(), false);
appendProperties(sb, PROPS_FUZZY_PARSE, isFuzzyParse(), false);
appendProperties(sb, PROPS_JSONROOT, getJsonRoot(), true);
appendProperties(sb, LoadStmt.STRICT_MODE, isStrictMode(), false);
appendProperties(sb, LoadStmt.TIMEZONE, getTimezone(), false);
appendProperties(sb, LoadStmt.EXEC_MEM_LIMIT, getMemLimit(), true);
sb.append(")\n");
// 6. data_source
sb.append("FROM ").append(dataSourceType).append("\n");
@ -1375,13 +1379,25 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
sb.append("(\n");
getDataSourceProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false));
// remove the last ,
if (progress instanceof KafkaProgress) {
// append partitions and offsets.
// the offsets is the next offset to be consumed.
List<Pair<Integer, String>> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false);
appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY,
Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false);
appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY,
Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false);
}
// remove the last ","
sb.replace(sb.length() - 2, sb.length() - 1, "");
sb.append(");");
return sb.toString();
}
private static void appendProperties(StringBuilder sb, String key, Object value, boolean end) {
if (value == null || Strings.isNullOrEmpty(value.toString())) {
return;
}
sb.append("\"").append(key).append("\"").append(" = ").append("\"").append(value).append("\"");
if (!end) {
sb.append(",\n");

View File

@ -40,7 +40,7 @@ public abstract class RoutineLoadProgress implements Writable {
abstract void update(RLTaskTxnCommitAttachment attachment);
abstract String toJsonString();
public static RoutineLoadProgress read(DataInput in) throws IOException {
RoutineLoadProgress progress = null;
LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in));

View File

@ -125,6 +125,7 @@ public class AlterRoutineLoadStmtTest {
}
}
// alter topic is now supported
{
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100");
@ -138,9 +139,8 @@ public class AlterRoutineLoadStmtTest {
try {
stmt.analyze(analyzer);
Assert.fail();
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("kafka_topic is invalid kafka property"));
Assert.fail();
} catch (UserException e) {
e.printStackTrace();
Assert.fail();

View File

@ -291,7 +291,7 @@ public class RoutineLoadDataSourcePropertiesTest {
@Test
public void testAlterAbnormal() {
// can not set KAFKA_BROKER_LIST_PROPERTY
// now support set KAFKA_BROKER_LIST_PROPERTY
Map<String, String> properties = Maps.newHashMap();
properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080");
properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1");
@ -301,7 +301,7 @@ public class RoutineLoadDataSourcePropertiesTest {
dsProperties.analyze();
Assert.fail();
} catch (UserException e) {
Assert.assertTrue(e.getMessage().contains("kafka_broker_list is invalid kafka property"));
Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time"));
}
// can not set datetime formatted offset and integer offset together

View File

@ -31,14 +31,14 @@ import org.apache.doris.thrift.TKafkaRLTaskProgress;
import org.apache.doris.transaction.TransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Map;
@ -328,22 +328,24 @@ public class RoutineLoadJobTest {
"PROPERTIES\n" +
"(\n" +
"\"desired_concurrent_number\" = \"0\",\n" +
"\"max_error_number\" = \"10\",\n" +
"\"max_batch_interval\" = \"10\",\n" +
"\"max_batch_rows\" = \"10\",\n" +
"\"max_batch_size\" = \"104857600\",\n" +
"\"max_error_number\" = \"10\",\n" +
"\"format\" = \"csv\",\n" +
"\"strip_outer_array\" = \"false\",\n" +
"\"num_as_string\" = \"false\",\n" +
"\"fuzzy_parse\" = \"false\",\n" +
"\"strict_mode\" = \"false\",\n" +
"\"timezone\" = \"Asia/Shanghai\",\n" +
"\"format\" = \"csv\",\n" +
"\"jsonpaths\" = \"\",\n" +
"\"strip_outer_array\" = \"false\",\n" +
"\"json_root\" = \"\"\n" +
"\"exec_mem_limit\" = \"2147483648\"\n" +
")\n" +
"FROM KAFKA\n" +
"(\n" +
"\"kafka_broker_list\" = \"localhost:9092\",\n" +
"\"kafka_topic\" = \"test_topic\"\n" +
");";
System.out.println(showCreateInfo);
Assert.assertEquals(expect, showCreateInfo);
}