From be89f0f77e5dcc8d8783184bd418bca9ab3739df Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Thu, 18 Nov 2021 14:31:16 +0800 Subject: [PATCH] [feat-opt](routine-load) Support show offset lag in show routine load stmt (#7114) Add a new field `Lag` in result of `show routine load` stmt. `Lag: {"0":10, "1":0}` means kafka partition 0 has 10 msg behind and partition 1 is update-to-date. --- .../Data Manipulation/SHOW ROUTINE LOAD.md | 45 +++++++++++++++++++ .../Data Manipulation/SHOW ROUTINE LOAD.md | 45 +++++++++++++++++++ .../doris/analysis/ShowRoutineLoadStmt.java | 1 + .../common/proc/RoutineLoadsNameProcDir.java | 3 +- .../doris/common/util/PropertyAnalyzer.java | 1 - .../doris/load/routineload/KafkaProgress.java | 21 +++++++++ .../load/routineload/KafkaRoutineLoadJob.java | 7 +++ .../load/routineload/RoutineLoadJob.java | 6 ++- 8 files changed, 126 insertions(+), 3 deletions(-) diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md index 68284d900a..7541fcaea0 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md @@ -25,6 +25,51 @@ under the License. --> # SHOW ROUTINE LOAD +## description + This statement is used to show the running status of the Routine Load job + grammar: + SHOW [ALL] ROUTINE LOAD [FOR jobName]; + + Result description: + + Id: Job ID + Name: job name + CreateTime: Job creation time + PauseTime: Last job pause time + EndTime: The end time of the job + DbName: corresponding database name + TableName: Corresponding table name + State: job running status + DataSourceType: Data source type: KAFKA + CurrentTaskNum: current number of subtasks + JobProperties: Job configuration details +DataSourceProperties: Data source configuration details + CustomProperties: custom configuration + Statistic: job running status statistics + Progress: Job running progress + Lag: job delay status +ReasonOfStateChanged: Reason of job status change + ErrorLogUrls: The viewing address of the filtered data with unqualified quality + OtherMsg: Other error messages + + * State + + There are the following 4 states: + + * NEED_SCHEDULE: The job is waiting to be scheduled + * RUNNING: The job is running + * PAUSED: The job is suspended + * STOPPED: The job has ended + * CANCELLED: The job has been cancelled + + * Progress + + For Kafka data sources, the offset currently consumed by each partition is displayed. For example, {"0":"2"} means that the consumption progress of Kafka partition 0 is 2. + + * Lag + + For Kafka data sources, the consumption delay of each partition is displayed. For example, {"0":10} means that the consumption delay of Kafka partition 0 is 10. + ## example 1. Show all routine import jobs named test 1 (including stopped or cancelled jobs). The result is one or more lines. diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md index 43d6a2a323..9b6e2e3ada 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW ROUTINE LOAD.md @@ -25,6 +25,51 @@ under the License. --> # SHOW ROUTINE LOAD +## description + 该语句用于展示 Routine Load 作业运行状态 + 语法: + SHOW [ALL] ROUTINE LOAD [FOR jobName]; + + 结果说明: + + Id: 作业ID + Name: 作业名称 + CreateTime: 作业创建时间 + PauseTime: 最近一次作业暂停时间 + EndTime: 作业结束时间 + DbName: 对应数据库名称 + TableName: 对应表名称 + State: 作业运行状态 + DataSourceType: 数据源类型:KAFKA + CurrentTaskNum: 当前子任务数量 + JobProperties: 作业配置详情 +DataSourceProperties: 数据源配置详情 + CustomProperties: 自定义配置 + Statistic: 作业运行状态统计信息 + Progress: 作业运行进度 + Lag: 作业延迟状态 +ReasonOfStateChanged: 作业状态变更的原因 + ErrorLogUrls: 被过滤的质量不合格的数据的查看地址 + OtherMsg: 其他错误信息 + + * State + + 有以下4种State: + + * NEED_SCHEDULE:作业等待被调度 + * RUNNING:作业运行中 + * PAUSED:作业被暂停 + * STOPPED:作业已结束 + * CANCELLED:作业已取消 + + * Progress + + 对于Kafka数据源,显示每个分区当前已消费的offset。如 {"0":"2"} 表示Kafka分区0的消费进度为2。 + + * Lag + + 对于Kafka数据源,显示每个分区的消费延迟。如{"0":10} 表示Kafka分区0的消费延迟为10。 + ## example 1. 展示名称为 test1 的所有例行导入作业(包括已停止或取消的作业)。结果为一行或多行。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java index 262f06f8d8..ac270ae122 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowRoutineLoadStmt.java @@ -81,6 +81,7 @@ public class ShowRoutineLoadStmt extends ShowStmt { .add("CustomProperties") .add("Statistic") .add("Progress") + .add("Lag") .add("ReasonOfStateChanged") .add("ErrorLogUrls") .add("OtherMsg") diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RoutineLoadsNameProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RoutineLoadsNameProcDir.java index b6b03090d2..7f5fa2b508 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/RoutineLoadsNameProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/RoutineLoadsNameProcDir.java @@ -17,13 +17,14 @@ package org.apache.doris.common.proc; -import com.google.common.base.Preconditions; import org.apache.doris.analysis.ShowRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadManager; +import com.google.common.base.Preconditions; + import java.util.List; /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 4673b16e01..68da40765f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -514,5 +514,4 @@ public class PropertyAnalyzer { } return replicaAlloc; } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 6c88b5b952..8f5f957892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -149,6 +149,27 @@ public class KafkaProgress extends RoutineLoadProgress { return pairs; } + // Get the lag of each kafka partition. + // the `partitionIdWithLatestOffsets` is the cached latest offsets of each partition, + // which is periodically updated as job is running. + // The latest offset saved in `partitionIdWithLatestOffsets` is the next offset of the partition, + // And offset saved in `partitionIdToOffset` is the next offset to be consumed. + // For example, if a partition has 4 msg with offsets: 0,1,2,3 + // The latest offset is 4, and offset to be consumed is 2, + // so the lag should be (4-2=)2. + public Map getLag(Map partitionIdWithLatestOffsets) { + Map lagMap = Maps.newHashMap(); + for (Map.Entry entry : partitionIdToOffset.entrySet()) { + if (partitionIdWithLatestOffsets.containsKey(entry.getKey())) { + long lag = partitionIdWithLatestOffsets.get(entry.getKey()) - entry.getValue(); + lagMap.put(entry.getKey(), lag); + } else { + lagMap.put(entry.getKey(), -1L); + } + } + return lagMap; + } + @Override public String toString() { Map showPartitionIdToOffset = Maps.newHashMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 1be8030f4c..316569fffa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -692,4 +692,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); return false; } + + @Override + protected String getLag() { + Map partitionIdToOffsetLag = ((KafkaProgress) progress).getLag(cachedPartitionWithLatestOffsets); + Gson gson = new Gson(); + return gson.toJson(partitionIdToOffsetLag); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index a45b5818c3..470c99bea9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -397,7 +397,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } public void setOtherMsg(String otherMsg) { - this.otherMsg = Strings.nullToEmpty(otherMsg); + this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + Strings.nullToEmpty(otherMsg); } public String getDbFullName() throws MetaNotFoundException { @@ -1253,10 +1253,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected abstract String getStatistic(); + protected abstract String getLag(); + public List getShowInfo() { Optional database = Catalog.getCurrentCatalog().getDb(dbId); Optional table = database.flatMap(db -> db.getTable(tableId)); + readLock(); try { List row = Lists.newArrayList(); @@ -1275,6 +1278,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl row.add(customPropertiesJsonToString()); row.add(getStatistic()); row.add(getProgress().toJsonString()); + row.add(getLag()); switch (state) { case PAUSED: row.add(pauseReason == null ? "" : pauseReason.toString());