[feat-opt](routine-load) Support show offset lag in show routine load stmt (#7114)

Add a new field `Lag` in result of `show routine load` stmt.

`Lag: {"0":10, "1":0}` means kafka partition 0 has 10 msg behind and partition 1 is update-to-date.
This commit is contained in:
Mingyu Chen
2021-11-18 14:31:16 +08:00
committed by GitHub
parent 74e8264c48
commit be89f0f77e
8 changed files with 126 additions and 3 deletions

View File

@ -25,6 +25,51 @@ under the License.
-->
# SHOW ROUTINE LOAD
## description
This statement is used to show the running status of the Routine Load job
grammar:
SHOW [ALL] ROUTINE LOAD [FOR jobName];
Result description:
Id: Job ID
Name: job name
CreateTime: Job creation time
PauseTime: Last job pause time
EndTime: The end time of the job
DbName: corresponding database name
TableName: Corresponding table name
State: job running status
DataSourceType: Data source type: KAFKA
CurrentTaskNum: current number of subtasks
JobProperties: Job configuration details
DataSourceProperties: Data source configuration details
CustomProperties: custom configuration
Statistic: job running status statistics
Progress: Job running progress
Lag: job delay status
ReasonOfStateChanged: Reason of job status change
ErrorLogUrls: The viewing address of the filtered data with unqualified quality
OtherMsg: Other error messages
* State
There are the following 4 states:
* NEED_SCHEDULE: The job is waiting to be scheduled
* RUNNING: The job is running
* PAUSED: The job is suspended
* STOPPED: The job has ended
* CANCELLED: The job has been cancelled
* Progress
For Kafka data sources, the offset currently consumed by each partition is displayed. For example, {"0":"2"} means that the consumption progress of Kafka partition 0 is 2.
* Lag
For Kafka data sources, the consumption delay of each partition is displayed. For example, {"0":10} means that the consumption delay of Kafka partition 0 is 10.
## example
1. Show all routine import jobs named test 1 (including stopped or cancelled jobs). The result is one or more lines.

View File

@ -25,6 +25,51 @@ under the License.
-->
# SHOW ROUTINE LOAD
## description
该语句用于展示 Routine Load 作业运行状态
语法:
SHOW [ALL] ROUTINE LOAD [FOR jobName];
结果说明:
Id: 作业ID
Name: 作业名称
CreateTime: 作业创建时间
PauseTime: 最近一次作业暂停时间
EndTime: 作业结束时间
DbName: 对应数据库名称
TableName: 对应表名称
State: 作业运行状态
DataSourceType: 数据源类型:KAFKA
CurrentTaskNum: 当前子任务数量
JobProperties: 作业配置详情
DataSourceProperties: 数据源配置详情
CustomProperties: 自定义配置
Statistic: 作业运行状态统计信息
Progress: 作业运行进度
Lag: 作业延迟状态
ReasonOfStateChanged: 作业状态变更的原因
ErrorLogUrls: 被过滤的质量不合格的数据的查看地址
OtherMsg: 其他错误信息
* State
有以下4种State:
* NEED_SCHEDULE:作业等待被调度
* RUNNING:作业运行中
* PAUSED:作业被暂停
* STOPPED:作业已结束
* CANCELLED:作业已取消
* Progress
对于Kafka数据源,显示每个分区当前已消费的offset。如 {"0":"2"} 表示Kafka分区0的消费进度为2。
* Lag
对于Kafka数据源,显示每个分区的消费延迟。如{"0":10} 表示Kafka分区0的消费延迟为10。
## example
1. 展示名称为 test1 的所有例行导入作业(包括已停止或取消的作业)。结果为一行或多行。

View File

@ -81,6 +81,7 @@ public class ShowRoutineLoadStmt extends ShowStmt {
.add("CustomProperties")
.add("Statistic")
.add("Progress")
.add("Lag")
.add("ReasonOfStateChanged")
.add("ErrorLogUrls")
.add("OtherMsg")

View File

@ -17,13 +17,14 @@
package org.apache.doris.common.proc;
import com.google.common.base.Preconditions;
import org.apache.doris.analysis.ShowRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
import com.google.common.base.Preconditions;
import java.util.List;
/*

View File

@ -514,5 +514,4 @@ public class PropertyAnalyzer {
}
return replicaAlloc;
}
}

View File

@ -149,6 +149,27 @@ public class KafkaProgress extends RoutineLoadProgress {
return pairs;
}
// Get the lag of each kafka partition.
// the `partitionIdWithLatestOffsets` is the cached latest offsets of each partition,
// which is periodically updated as job is running.
// The latest offset saved in `partitionIdWithLatestOffsets` is the next offset of the partition,
// And offset saved in `partitionIdToOffset` is the next offset to be consumed.
// For example, if a partition has 4 msg with offsets: 0,1,2,3
// The latest offset is 4, and offset to be consumed is 2,
// so the lag should be (4-2=)2.
public Map<Integer, Long> getLag(Map<Integer, Long> partitionIdWithLatestOffsets) {
Map<Integer, Long> lagMap = Maps.newHashMap();
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
if (partitionIdWithLatestOffsets.containsKey(entry.getKey())) {
long lag = partitionIdWithLatestOffsets.get(entry.getKey()) - entry.getValue();
lagMap.put(entry.getKey(), lag);
} else {
lagMap.put(entry.getKey(), -1L);
}
}
return lagMap;
}
@Override
public String toString() {
Map<Integer, String> showPartitionIdToOffset = Maps.newHashMap();

View File

@ -692,4 +692,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id);
return false;
}
@Override
protected String getLag() {
Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress) progress).getLag(cachedPartitionWithLatestOffsets);
Gson gson = new Gson();
return gson.toJson(partitionIdToOffsetLag);
}
}

View File

@ -397,7 +397,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
public void setOtherMsg(String otherMsg) {
this.otherMsg = Strings.nullToEmpty(otherMsg);
this.otherMsg = TimeUtils.getCurrentFormatTime() + ":" + Strings.nullToEmpty(otherMsg);
}
public String getDbFullName() throws MetaNotFoundException {
@ -1253,10 +1253,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected abstract String getStatistic();
protected abstract String getLag();
public List<String> getShowInfo() {
Optional<Database> database = Catalog.getCurrentCatalog().getDb(dbId);
Optional<Table> table = database.flatMap(db -> db.getTable(tableId));
readLock();
try {
List<String> row = Lists.newArrayList();
@ -1275,6 +1278,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
row.add(customPropertiesJsonToString());
row.add(getStatistic());
row.add(getProgress().toJsonString());
row.add(getLag());
switch (state) {
case PAUSED:
row.add(pauseReason == null ? "" : pauseReason.toString());