From 93cfdffb75eb1df6dd245abb387aa596b92cb7e2 Mon Sep 17 00:00:00 2001 From: Guangdong Liu Date: Thu, 23 Nov 2023 14:25:54 +0800 Subject: [PATCH] [regression test](routine test) add case for exec_mem_limit (#27308) --- .../doris/analysis/CreateRoutineLoadStmt.java | 3 +- .../routine_load/test_routine_load.groovy | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 3564e7f1e1..260f7f7b24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -488,7 +488,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { RoutineLoadJob.DEFAULT_STRICT_MODE, LoadStmt.STRICT_MODE + " should be a boolean"); execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY), - RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, EXEC_MEM_LIMIT_PROPERTY + "should > 0"); + RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, + EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0"); sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM), ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED, diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy index 20009d25cb..0a76a0d863 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy @@ -397,6 +397,47 @@ suite("test_routine_load","p0") { } } + i = 0 + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + for (String tableName in tables) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + + def name = "routine_load_" + tableName + try { + sql """ + CREATE ROUTINE LOAD ${jobs[i]} ON ${name} + COLUMNS(${columns[i]}), + COLUMNS TERMINATED BY "|" + PROPERTIES + ( + "exec_mem_limit" = "test", + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${topics[i]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + i++ + } catch (Exception e) { + log.info("exception: ${e.toString()}".toString()) + assertEquals(e.toString(), "java.sql.SQLException: errCode = 2, detailMessage = exec_mem_limit must be greater than 0") + } + } + } finally { + for (String tableName in tables) { + sql new File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text + } + } + } + // timezone i = 0 if (enabled != null && enabled.equalsIgnoreCase("true")) {