branch-2.1: [improvement](ccr) Add and adjust result for get_lag #48953 (#49055)

Cherry-picked from #48953

Co-authored-by: Uniqueyou <wangyixuan@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-03-17 10:50:07 +08:00
committed by GitHub
parent e7d4dda3c8
commit cf88db4938
4 changed files with 34 additions and 14 deletions

View File

@ -23,13 +23,18 @@ public class BinlogLagInfo {
private long lastCommitSeq;
private long firstCommitTs;
private long lastCommitTs;
private long nextCommitSeq;
private long nextCommitTs;
public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs) {
public BinlogLagInfo(long lag, long firstCommitSeq, long lastCommitSeq, long firstCommitTs, long lastCommitTs,
long nextCommitSeq, long nextCommitTs) {
this.lag = lag;
this.firstCommitSeq = firstCommitSeq;
this.lastCommitSeq = lastCommitSeq;
this.firstCommitTs = firstCommitTs;
this.lastCommitTs = lastCommitTs;
this.nextCommitSeq = nextCommitSeq;
this.nextCommitTs = nextCommitTs;
}
public BinlogLagInfo() {
@ -38,6 +43,16 @@ public class BinlogLagInfo {
lastCommitSeq = 0;
firstCommitTs = 0;
lastCommitTs = 0;
nextCommitSeq = 0;
nextCommitTs = 0;
}
public long getNextCommitSeq() {
return nextCommitSeq;
}
public long getNextCommitTs() {
return nextCommitTs;
}
public long getLag() {

View File

@ -68,30 +68,31 @@ public class BinlogUtils {
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
BinlogLagInfo lagInfo = new BinlogLagInfo(binlogs.size(), firstBinlog.getCommitSeq(),
lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(), lastBinlog.getTimestamp());
lastBinlog.getCommitSeq(), firstBinlog.getTimestamp(), lastBinlog.getTimestamp(),
firstBinlog.getCommitSeq(), firstBinlog.getTimestamp());
return Pair.of(status, lagInfo);
}
// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
guard.setCommitSeq(prevCommitSeq);
TBinlog binlog = binlogs.higher(guard);
TBinlog nextBinlog = binlogs.higher(guard);
// all prevCommitSeq <= commitSeq
long lastCommitSeq = lastBinlog.getCommitSeq();
long lastCommitTs = lastBinlog.getTimestamp();
long firstCommitSeq = firstBinlog.getCommitSeq();
long firstCommitTs = firstBinlog.getTimestamp();
long lag = 0;
long lastCommitSeq = 0;
long lastCommitTs = 0;
long firstCommitSeq = 0;
long firstCommitTs = 0;
if (binlog != null) {
lag = binlogs.tailSet(binlog).size();
firstCommitSeq = binlog.getCommitSeq();
firstCommitTs = binlog.getTimestamp();
lastCommitSeq = lastBinlog.getCommitSeq();
lastCommitTs = lastBinlog.getTimestamp();
long nextCommitSeq = 0;
long nextCommitTs = 0;
if (nextBinlog != null) {
lag = binlogs.tailSet(nextBinlog).size();
nextCommitSeq = nextBinlog.getCommitSeq();
nextCommitTs = nextBinlog.getTimestamp();
}
return Pair.of(status, new BinlogLagInfo(lag, firstCommitSeq, lastCommitSeq,
firstCommitTs, lastCommitTs));
firstCommitTs, lastCommitTs, nextCommitSeq, nextCommitTs));
}
public static TBinlog newDummyBinlog(long dbId, long tableId) {

View File

@ -3466,6 +3466,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
result.setLastCommitSeq(lagInfo.getLastCommitSeq());
result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs());
result.setLastBinlogTimestamp(lagInfo.getLastCommitTs());
result.setNextCommitSeq(lagInfo.getNextCommitSeq());
result.setNextBinlogTimestamp(lagInfo.getNextCommitTs());
}
return result;
}

View File

@ -1450,6 +1450,8 @@ struct TGetBinlogLagResult {
5: optional i64 last_commit_seq
6: optional i64 first_binlog_timestamp
7: optional i64 last_binlog_timestamp
8: optional i64 next_commit_seq
9: optional i64 next_binlog_timestamp
}
struct TUpdateFollowerStatsCacheRequest {