From 320f1e9bbf6d10d0aa5db00eeb32c073dfa7078a Mon Sep 17 00:00:00 2001 From: camby Date: Fri, 15 Sep 2023 10:22:47 +0800 Subject: [PATCH] [improve](routineload) improve show routine load output (#24264) --- .../Show-Statements/SHOW-ROUTINE-LOAD-TASK.md | 2 +- .../Show-Statements/SHOW-ROUTINE-LOAD-TASK.md | 2 +- .../doris/analysis/CreateRoutineLoadStmt.java | 1 + .../load/routineload/RoutineLoadJob.java | 29 +++++++++++-------- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md index 359f5018fb..f144ff30a2 100644 --- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md +++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md @@ -61,7 +61,7 @@ DataSourceProperties: {"0":19} - `JobId`: The job ID corresponding to the subtask. - `CreateTime`: The creation time of the subtask. - `ExecuteStartTime`: The time when the subtask is scheduled to be executed, usually later than the creation time. -- `Timeout`: Subtask timeout, usually twice the `MaxIntervalS` set by the job. +- `Timeout`: Subtask timeout, usually twice the `max_batch_interval` set by the job. - `BeId`: The ID of the BE node executing this subtask. - `DataSourceProperties`: The starting offset of the Kafka Partition that the subtask is ready to consume. is a Json format string. Key is Partition Id. Value is the starting offset of consumption. diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md index b9f4c610ff..c8a7c0f5bd 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-ROUTINE-LOAD-TASK.md @@ -59,7 +59,7 @@ DataSourceProperties: {"0":19} - `JobId`:子任务对应的作业 ID。 - `CreateTime`:子任务的创建时间。 - `ExecuteStartTime`:子任务被调度执行的时间,通常晚于创建时间。 -- `Timeout`:子任务超时时间,通常是作业设置的 `MaxIntervalS` 的两倍。 +- `Timeout`:子任务超时时间,通常是作业设置的 `max_batch_interval` 的两倍。 - `BeId`:执行这个子任务的 BE 节点 ID。 - `DataSourceProperties`:子任务准备消费的 Kafka Partition 的起始 offset。是一个 Json 格式字符串。Key 为 Partition Id。Value 为消费的起始 offset。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 61b72ebb64..94655cbb95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -92,6 +92,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { // routine load properties public static final String DESIRED_CONCURRENT_NUMBER_PROPERTY = "desired_concurrent_number"; + public static final String CURRENT_CONCURRENT_NUMBER_PROPERTY = "current_concurrent_number"; // max error number in ten thousand records public static final String MAX_ERROR_NUMBER_PROPERTY = "max_error_number"; public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 2d1e9de994..46183588df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1584,21 +1584,26 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl jobProperties.put("precedingFilter", precedingFilter == null ? STAR_STRING : precedingFilter.toSql()); jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toSql()); if (getFormat().equalsIgnoreCase("json")) { - jobProperties.put("dataFormat", "json"); + jobProperties.put(PROPS_FORMAT, "json"); } else { - jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); - jobProperties.put("lineDelimiter", lineDelimiter == null ? "\n" : lineDelimiter.toString()); + jobProperties.put(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR, + columnSeparator == null ? "\t" : columnSeparator.toString()); + jobProperties.put(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER, + lineDelimiter == null ? "\n" : lineDelimiter.toString()); } jobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); - jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); - jobProperties.put("maxBatchIntervalS", String.valueOf(maxBatchIntervalS)); - jobProperties.put("maxBatchRows", String.valueOf(maxBatchRows)); - jobProperties.put("maxBatchSizeBytes", String.valueOf(maxBatchSizeBytes)); - jobProperties.put("currentTaskConcurrentNum", String.valueOf(currentTaskConcurrentNum)); - jobProperties.put("desireTaskConcurrentNum", String.valueOf(desireTaskConcurrentNum)); - jobProperties.put("execMemLimit", String.valueOf(execMemLimit)); - jobProperties.put("mergeType", mergeType.toString()); - jobProperties.put("deleteCondition", deleteCondition == null ? STAR_STRING : deleteCondition.toSql()); + jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, String.valueOf(maxErrorNum)); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY, String.valueOf(maxBatchIntervalS)); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, String.valueOf(maxBatchRows)); + jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, String.valueOf(maxBatchSizeBytes)); + jobProperties.put(CreateRoutineLoadStmt.CURRENT_CONCURRENT_NUMBER_PROPERTY, + String.valueOf(currentTaskConcurrentNum)); + jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, + String.valueOf(desireTaskConcurrentNum)); + jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, String.valueOf(execMemLimit)); + jobProperties.put(LoadStmt.KEY_IN_PARAM_MERGE_TYPE, mergeType.toString()); + jobProperties.put(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION, + deleteCondition == null ? STAR_STRING : deleteCondition.toSql()); jobProperties.putAll(this.jobProperties); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(jobProperties);