diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7b0fef4a3d..c02bce86b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; @@ -71,7 +72,9 @@ import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.load.routineload.ErrorReason; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadJob.JobState; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -1873,6 +1876,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { status = new TStatus(TStatusCode.ANALYSIS_ERROR); status.addToErrorMsgs(exception.getMessage()); result.setStatus(status); + try { + RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() + .getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId()); + routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR, + "failed to get stream load plan, " + exception.getMessage()), false); + } catch (UserException e) { + LOG.warn("catch update routine load job error.", e); + } return result; } long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; diff --git a/regression-test/suites/load_p0/routine_load/data/multi_table_load_invalid_table.csv b/regression-test/suites/load_p0/routine_load/data/multi_table_load_invalid_table.csv new file mode 100644 index 0000000000..ff0f0d1b06 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/multi_table_load_invalid_table.csv @@ -0,0 +1 @@ +routine_load_invalid_table|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, "name": "Alice", "grade": 9, "subjects": ["math", "science", "history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{} \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy new file mode 100644 index 0000000000..b04610740d --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_error","p0") { + def kafkaCsvTpoics = [ + "multi_table_load_invalid_table", + ] + + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + def i = 0 + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + sql """ + CREATE ROUTINE LOAD testTableNoExist + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "multi_table_load_invalid_table", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + def count = 0 + while (true) { + sleep(1000) + def res = sql "show routine load for testTableNoExist" + def state = res[0][8].toString() + if (state != "PAUSED") { + count++ + if (count > 60) { + assertEquals(1, 2) + } + continue; + } + log.info("reason of state changed: ${res[0][17].toString()}".toString()) + assertEquals(res[0][17].toString(), "ErrorReason{code=errCode = 100, msg='failed to get stream load plan, errCode = 7, detailMessage = table not found, table name is routine_load_invalid_table'}") + break; + } + } finally { + sql "stop routine load for testTableNoExist" + } + } +} \ No newline at end of file