[fix](routineload) check offset when schedule tasks (#30136)

This commit is contained in:
HHoflittlefish777
2024-01-22 13:16:03 +08:00
committed by yiguolei
parent d0dd090458
commit 3e73933857
5 changed files with 74 additions and 16 deletions

View File

@ -704,7 +704,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// check if given partitions has more data to consume.
// 'partitionIdToOffset' to the offset to be consumed.
public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdToOffset) {
public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdToOffset) throws UserException {
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
&& entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) {
@ -734,11 +734,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// check again
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
&& entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) {
LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}",
partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id);
return true;
Integer partitionId = entry.getKey();
if (cachedPartitionWithLatestOffsets.containsKey(partitionId)) {
long partitionLatestOffset = cachedPartitionWithLatestOffsets.get(partitionId);
long recordPartitionOffset = entry.getValue();
if (recordPartitionOffset < partitionLatestOffset) {
LOG.debug("has more data to consume. offsets to be consumed: {},"
+ " latest offsets: {}, task {}, job {}",
partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id);
return true;
} else if (recordPartitionOffset > partitionLatestOffset) {
String msg = "offset set in job: " + recordPartitionOffset
+ " is greater than kafka latest offset: "
+ partitionLatestOffset + " partition id: "
+ partitionId;
throw new UserException(msg);
}
}
}

View File

@ -119,7 +119,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
}
@Override
boolean hasMoreDataToConsume() {
boolean hasMoreDataToConsume() throws UserException {
KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId);
return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset);
}

View File

@ -209,7 +209,7 @@ public abstract class RoutineLoadTaskInfo {
abstract String getTaskDataSourceProperties();
abstract boolean hasMoreDataToConsume();
abstract boolean hasMoreDataToConsume() throws UserException;
@Override
public boolean equals(Object obj) {

View File

@ -128,15 +128,15 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
return;
}
// check if topic has more data to consume
if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
needScheduleTasksQueue.put(routineLoadTaskInfo);
return;
}
// allocate BE slot for this task.
// this should be done before txn begin, or the txn may be begun successfully but failed to be allocated.
try {
// check if topic has more data to consume
if (!routineLoadTaskInfo.hasMoreDataToConsume()) {
needScheduleTasksQueue.put(routineLoadTaskInfo);
return;
}
// allocate BE slot for this task.
// this should be done before txn begin, or the txn may be begun successfully but failed to be allocated.
if (!allocateTaskToBe(routineLoadTaskInfo)) {
// allocate failed, push it back to the queue to wait next scheduling
needScheduleTasksQueue.put(routineLoadTaskInfo);