[chore](fe) Returns dropped tables in GetMeta request (#38541)
Cherry-pick #38019
This commit is contained in:
@ -369,6 +369,20 @@ public class BinlogManager {
|
||||
}
|
||||
}
|
||||
|
||||
// get the dropped tables of the db.
|
||||
public List<Long> getDroppedTables(long dbId) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
|
||||
if (dbBinlog == null) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
return dbBinlog.getDroppedTables();
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<BinlogTombstone> gc() {
|
||||
LOG.info("begin gc binlog");
|
||||
|
||||
|
||||
@ -62,6 +62,8 @@ public class DBBinlog {
|
||||
|
||||
// The commit seq of the dropped partitions
|
||||
private List<Pair<Long, Long>> droppedPartitions;
|
||||
// The commit seq of the dropped tables
|
||||
private List<Pair<Long, Long>> droppedTables;
|
||||
|
||||
private List<TBinlog> tableDummyBinlogs;
|
||||
|
||||
@ -79,6 +81,7 @@ public class DBBinlog {
|
||||
tableBinlogMap = Maps.newHashMap();
|
||||
timestamps = Lists.newArrayList();
|
||||
droppedPartitions = Lists.newArrayList();
|
||||
droppedTables = Lists.newArrayList();
|
||||
|
||||
TBinlog dummy;
|
||||
if (binlog.getType() == TBinlogType.DUMMY) {
|
||||
@ -121,6 +124,11 @@ public class DBBinlog {
|
||||
if (info != null && info.getPartitionId() > 0) {
|
||||
droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq()));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.DROP_TABLE) {
|
||||
DropTableRecord record = DropTableRecord.fromJson(binlog.data);
|
||||
if (record != null && record.getTableId() > 0) {
|
||||
droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq()));
|
||||
}
|
||||
}
|
||||
|
||||
if (tableIds == null) {
|
||||
@ -174,6 +182,19 @@ public class DBBinlog {
|
||||
if (!binlog.isSetType()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
|
||||
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
|
||||
if (partitionId > 0) {
|
||||
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
|
||||
}
|
||||
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
|
||||
long tableId = ((DropTableRecord) raw).getTableId();
|
||||
if (tableId > 0) {
|
||||
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
|
||||
}
|
||||
}
|
||||
|
||||
switch (binlog.getType()) {
|
||||
case CREATE_TABLE:
|
||||
return;
|
||||
@ -183,13 +204,6 @@ public class DBBinlog {
|
||||
break;
|
||||
}
|
||||
|
||||
if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
|
||||
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
|
||||
if (partitionId > 0) {
|
||||
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
|
||||
}
|
||||
}
|
||||
|
||||
for (long tableId : tableIds) {
|
||||
TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable);
|
||||
if (tableBinlog != null) {
|
||||
@ -237,6 +251,18 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
// Get the dropped tables of the db.
|
||||
public List<Long> getDroppedTables() {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return droppedTables.stream()
|
||||
.map(v -> v.first)
|
||||
.collect(Collectors.toList());
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
lock.readLock().lock();
|
||||
@ -354,7 +380,7 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
gcDroppedPartitions(largestExpiredCommitSeq);
|
||||
gcDroppedPartitionAndTables(largestExpiredCommitSeq);
|
||||
if (lastCommitSeq != -1) {
|
||||
dummy.setCommitSeq(lastCommitSeq);
|
||||
}
|
||||
@ -392,7 +418,7 @@ public class DBBinlog {
|
||||
timeIter.remove();
|
||||
}
|
||||
|
||||
gcDroppedPartitions(lastExpiredBinlog.getCommitSeq());
|
||||
gcDroppedPartitionAndTables(lastExpiredBinlog.getCommitSeq());
|
||||
}
|
||||
|
||||
return lastExpiredBinlog;
|
||||
@ -502,11 +528,15 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
private void gcDroppedPartitions(long commitSeq) {
|
||||
private void gcDroppedPartitionAndTables(long commitSeq) {
|
||||
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
|
||||
while (iter.hasNext() && iter.next().second < commitSeq) {
|
||||
iter.remove();
|
||||
}
|
||||
iter = droppedTables.iterator();
|
||||
while (iter.hasNext() && iter.next().second < commitSeq) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
|
||||
// not thread safety, do this without lock
|
||||
|
||||
@ -58,6 +58,10 @@ public class DropTableRecord {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
public static DropTableRecord fromJson(String json) {
|
||||
return GsonUtils.GSON.fromJson(json, DropTableRecord.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toJson();
|
||||
|
||||
@ -6122,6 +6122,7 @@ public class Env {
|
||||
if (Config.enable_feature_binlog) {
|
||||
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
|
||||
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
|
||||
dbMeta.setDroppedTables(binlogManager.getDroppedTables(db.getId()));
|
||||
}
|
||||
|
||||
result.setDbMeta(dbMeta);
|
||||
|
||||
Reference in New Issue
Block a user