From 3e73933857d1173a11c4bc0114bf495afb3cb537 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:16:03 +0800 Subject: [PATCH] [fix](routineload) check offset when schedule tasks (#30136) --- .../load/routineload/KafkaRoutineLoadJob.java | 23 ++++++--- .../doris/load/routineload/KafkaTaskInfo.java | 2 +- .../load/routineload/RoutineLoadTaskInfo.java | 2 +- .../routineload/RoutineLoadTaskScheduler.java | 16 +++---- .../test_routine_load_error.groovy | 47 +++++++++++++++++++ 5 files changed, 74 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index d7a090c23e..faad0a0248 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -704,7 +704,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // check if given partitions has more data to consume. // 'partitionIdToOffset' to the offset to be consumed. - public boolean hasMoreDataToConsume(UUID taskId, Map partitionIdToOffset) { + public boolean hasMoreDataToConsume(UUID taskId, Map partitionIdToOffset) throws UserException { for (Map.Entry entry : partitionIdToOffset.entrySet()) { if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { @@ -734,11 +734,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // check again for (Map.Entry entry : partitionIdToOffset.entrySet()) { - if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) - && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { - LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", - partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); - return true; + Integer partitionId = entry.getKey(); + if (cachedPartitionWithLatestOffsets.containsKey(partitionId)) { + long partitionLatestOffset = cachedPartitionWithLatestOffsets.get(partitionId); + long recordPartitionOffset = entry.getValue(); + if (recordPartitionOffset < partitionLatestOffset) { + LOG.debug("has more data to consume. offsets to be consumed: {}," + + " latest offsets: {}, task {}, job {}", + partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); + return true; + } else if (recordPartitionOffset > partitionLatestOffset) { + String msg = "offset set in job: " + recordPartitionOffset + + " is greater than kafka latest offset: " + + partitionLatestOffset + " partition id: " + + partitionId; + throw new UserException(msg); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index 2075e5548e..a8d387a2f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -119,7 +119,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { } @Override - boolean hasMoreDataToConsume() { + boolean hasMoreDataToConsume() throws UserException { KafkaRoutineLoadJob routineLoadJob = (KafkaRoutineLoadJob) routineLoadManager.getJob(jobId); return routineLoadJob.hasMoreDataToConsume(id, partitionIdToOffset); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index fd8e818232..9c28dbfc6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -209,7 +209,7 @@ public abstract class RoutineLoadTaskInfo { abstract String getTaskDataSourceProperties(); - abstract boolean hasMoreDataToConsume(); + abstract boolean hasMoreDataToConsume() throws UserException; @Override public boolean equals(Object obj) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java index 0f594a2d50..a6cc796027 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java @@ -128,15 +128,15 @@ public class RoutineLoadTaskScheduler extends MasterDaemon { return; } - // check if topic has more data to consume - if (!routineLoadTaskInfo.hasMoreDataToConsume()) { - needScheduleTasksQueue.put(routineLoadTaskInfo); - return; - } - - // allocate BE slot for this task. - // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. try { + // check if topic has more data to consume + if (!routineLoadTaskInfo.hasMoreDataToConsume()) { + needScheduleTasksQueue.put(routineLoadTaskInfo); + return; + } + + // allocate BE slot for this task. + // this should be done before txn begin, or the txn may be begun successfully but failed to be allocated. if (!allocateTaskToBe(routineLoadTaskInfo)) { // allocate failed, push it back to the queue to wait next scheduling needScheduleTasksQueue.put(routineLoadTaskInfo); diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy index b04610740d..c4180e35f4 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy @@ -91,4 +91,51 @@ suite("test_routine_load_error","p0") { sql "stop routine load for testTableNoExist" } } + + // test out of range + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "testOutOfRange" + try { + sql """ + CREATE ROUTINE LOAD ${jobName} + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_partitions" = "0", + "kafka_topic" = "multi_table_load_invalid_table", + "kafka_offsets" = "100" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for ${jobName}" + def state = res[0][8].toString() + log.info("routine load state: ${res[0][8].toString()}".toString()) + log.info("routine load statistic: ${res[0][14].toString()}".toString()) + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + if (state != "PAUSED") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertTrue(res[0][17].toString().contains("is greater than kafka latest offset")) + break; + } + } finally { + sql "stop routine load for ${jobName}" + } + } } \ No newline at end of file