[Feature] (binlog) Add getBinlogLag (#21637)

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
Jack Drogon
2023-07-08 07:41:45 +08:00
committed by GitHub
parent 499592178e
commit 51b0bbb667
6 changed files with 183 additions and 11 deletions

View File

@ -141,7 +141,7 @@ public class BinlogManager {
}
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long commitSeq) {
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@ -152,7 +152,25 @@ public class BinlogManager {
return Pair.of(status, null);
}
return dbBinlog.getBinlog(tableId, commitSeq);
return dbBinlog.getBinlog(tableId, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
}
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
if (dbBinlog == null) {
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_DB);
LOG.warn("dbBinlog not found. dbId: {}", dbId);
return Pair.of(status, null);
}
return dbBinlog.getBinlogLag(tableId, prevCommitSeq);
} finally {
lock.readLock().unlock();
}

View File

@ -25,22 +25,22 @@ import org.apache.doris.thrift.TStatusCode;
import java.util.TreeSet;
public class BinlogUtils {
public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long commitSeq) {
public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
// all commitSeq > commitSeq
if (firstBinlog.getCommitSeq() > commitSeq) {
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
status.setStatusCode(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ);
return Pair.of(status, firstBinlog);
}
// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
guard.setCommitSeq(commitSeq);
guard.setCommitSeq(prevCommitSeq);
TBinlog binlog = binlogs.higher(guard);
// all commitSeq <= commitSeq
// all commitSeq <= prevCommitSeq
if (binlog == null) {
status.setStatusCode(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ);
return Pair.of(status, null);
@ -48,4 +48,25 @@ public class BinlogUtils {
return Pair.of(status, binlog);
}
}
public static Pair<TStatus, Long> getBinlogLag(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
TBinlog firstBinlog = binlogs.first();
if (firstBinlog.getCommitSeq() > prevCommitSeq) {
return Pair.of(status, Long.valueOf(binlogs.size()));
}
// find first binlog whose commitSeq > commitSeq
TBinlog guard = new TBinlog();
guard.setCommitSeq(prevCommitSeq);
TBinlog binlog = binlogs.higher(guard);
// all prevCommitSeq <= commitSeq
if (binlog == null) {
return Pair.of(status, 0L);
} else {
return Pair.of(status, Long.valueOf(binlogs.tailSet(binlog).size()));
}
}
}

View File

@ -100,7 +100,7 @@ public class DBBinlog {
return dbId;
}
public Pair<TStatus, TBinlog> getBinlog(long tableId, long commitSeq) {
public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
@ -110,10 +110,29 @@ public class DBBinlog {
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
return tableBinlog.getBinlog(commitSeq);
return tableBinlog.getBinlog(prevCommitSeq);
}
return BinlogUtils.getBinlog(allBinlogs, commitSeq);
return BinlogUtils.getBinlog(allBinlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
try {
if (tableId >= 0) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
status.setStatusCode(TStatusCode.BINLOG_NOT_FOUND_TABLE);
return Pair.of(status, null);
}
return tableBinlog.getBinlogLag(prevCommitSeq);
}
return BinlogUtils.getBinlogLag(allBinlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}

View File

@ -67,10 +67,19 @@ public class TableBinlog {
}
}
public Pair<TStatus, TBinlog> getBinlog(long commitSeq) {
public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlog(binlogs, commitSeq);
return BinlogUtils.getBinlog(binlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}
}
public Pair<TStatus, Long> getBinlogLag(long prevCommitSeq) {
lock.readLock().lock();
try {
return BinlogUtils.getBinlogLag(binlogs, prevCommitSeq);
} finally {
lock.readLock().unlock();
}

View File

@ -106,6 +106,7 @@ import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TGetBinlogLagResult;
import org.apache.doris.thrift.TGetBinlogRequest;
import org.apache.doris.thrift.TGetBinlogResult;
import org.apache.doris.thrift.TGetDbsParams;
@ -2572,4 +2573,99 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
// getBinlogLag
public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.debug("receive get binlog request: {}", request);
TGetBinlogLagResult result = new TGetBinlogLagResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
result = getBinlogLagImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get binlog: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String clientIp) throws UserException {
/// Check all required arg: user, passwd, db, prev_commit_seq
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetPrevCommitSeq()) {
throw new UserException("prev_commit_seq is not set");
}
// step 1: check auth
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
cluster = SystemInfoService.DEFAULT_CLUSTER;
}
if (Strings.isNullOrEmpty(request.getToken())) {
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTable(),
request.getUserIp(), PrivPredicate.SELECT);
}
// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb());
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
if (request.isSetTableId()) {
tableId = request.getTableId();
} else if (request.isSetTable()) {
String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
}
tableId = table.getId();
}
// step 6: get binlog
long dbId = db.getId();
TGetBinlogLagResult result = new TGetBinlogLagResult();
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
Pair<TStatus, Long> statusLagPair = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
TStatus status = statusLagPair.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
}
Long binlogLag = statusLagPair.second;
if (binlogLag != null) {
result.setLag(binlogLag);
}
return result;
}
}

View File

@ -1046,6 +1046,13 @@ struct TGetMasterTokenResult {
2: optional string token
}
typedef TGetBinlogRequest TGetBinlogLagRequest
struct TGetBinlogLagResult {
1: optional Status.TStatus status
2: optional i64 lag
}
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@ -1108,4 +1115,6 @@ service FrontendService {
TGetTabletReplicaInfosResult getTabletReplicaInfos(1: TGetTabletReplicaInfosRequest request)
TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request)
}