pick (#38474) When change routine load job topic from test_topic_before to test_topic_after by ``` ALTER ROUTINE LOAD FOR test_topic_change FROM KAFKA("kafka_topic" = "test_topic_after"); ``` (test_topic_before has 5 rows and test_topic_after has 1 rows) Exception happened, which cannot consume any data: ``` 2024-07-29 15:57:28,122 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,123 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,125 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,126 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,128 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,129 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,131 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,133 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,134 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,136 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 2024-07-29 15:57:28,137 WARN (Routine load task scheduler|55) [KafkaRoutineLoadJob.hasMoreDataToConsume():792] Kafka offset fallback. partition: 0, cache offset: 5 get latest of fset: 1, task 16656914-ba0a-465d-8e79-8252b423b0fc, job 16615 ``` It is necessary to reset Kafka progress cache when routine load job topic change.
This commit is contained in:
@ -118,15 +118,17 @@ public class KafkaProgress extends RoutineLoadProgress {
|
||||
}
|
||||
}
|
||||
|
||||
// modify the partition offset of this progress.
|
||||
// throw exception is the specified partition does not exist in progress.
|
||||
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
|
||||
public void checkPartitions(List<Pair<Integer, Long>> kafkaPartitionOffsets) throws DdlException {
|
||||
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
|
||||
if (!partitionIdToOffset.containsKey(pair.first)) {
|
||||
throw new DdlException("The specified partition " + pair.first + " is not in the consumed partitions");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// modify the partition offset of this progress.
|
||||
// throw exception is the specified partition does not exist in progress.
|
||||
public void modifyOffset(List<Pair<Integer, Long>> kafkaPartitionOffsets) {
|
||||
for (Pair<Integer, Long> pair : kafkaPartitionOffsets) {
|
||||
partitionIdToOffset.put(pair.first, pair.second);
|
||||
}
|
||||
|
||||
@ -692,23 +692,33 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
customKafkaProperties = dataSourceProperties.getCustomKafkaProperties();
|
||||
}
|
||||
|
||||
// modify partition offset first
|
||||
// convertCustomProperties and check partitions before reset progress to make modify operation atomic
|
||||
if (!customKafkaProperties.isEmpty()) {
|
||||
this.customProperties.putAll(customKafkaProperties);
|
||||
convertCustomProperties(true);
|
||||
}
|
||||
|
||||
if (!kafkaPartitionOffsets.isEmpty()) {
|
||||
((KafkaProgress) progress).checkPartitions(kafkaPartitionOffsets);
|
||||
}
|
||||
|
||||
// It is necessary to reset the Kafka progress cache if topic change,
|
||||
// and should reset cache before modifying partition offset.
|
||||
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
|
||||
this.topic = dataSourceProperties.getTopic();
|
||||
this.progress = new KafkaProgress();
|
||||
}
|
||||
|
||||
// modify partition offset
|
||||
if (!kafkaPartitionOffsets.isEmpty()) {
|
||||
// we can only modify the partition that is being consumed
|
||||
((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets);
|
||||
}
|
||||
|
||||
if (!customKafkaProperties.isEmpty()) {
|
||||
this.customProperties.putAll(customKafkaProperties);
|
||||
convertCustomProperties(true);
|
||||
}
|
||||
// modify broker list and topic
|
||||
// modify broker list
|
||||
if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) {
|
||||
this.brokerList = dataSourceProperties.getBrokerList();
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) {
|
||||
this.topic = dataSourceProperties.getTopic();
|
||||
}
|
||||
}
|
||||
if (!jobProperties.isEmpty()) {
|
||||
Map<String, String> copiedJobProperties = Maps.newHashMap(jobProperties);
|
||||
|
||||
Reference in New Issue
Block a user