[Fix](binlog) Fix bugs in tombstone (#22031)

This commit is contained in:
DeadlineFen
2023-07-24 14:33:16 +08:00
committed by GitHub
parent b5f27b5349
commit 667e4ea99b
3 changed files with 83 additions and 84 deletions

View File

@ -36,33 +36,34 @@ public class BinlogTombstone {
@SerializedName(value = "commitSeq")
private long commitSeq;
// TODO(deadlinefen): delete this field later
// This is a reserved field for the transition between new and old versions.
// It will be deleted later
@SerializedName(value = "tableIds")
private List<Long> tableIds;
@SerializedName(value = "tableCommitSeqMap")
private Map<Long, Long> tableCommitSeqMap;
@SerializedName(value = "tableVersionMap")
// this map keep last upsert record <tableId, UpsertRecord>
// only for master fe to send be gc task, not need persist
private Map<Long, UpsertRecord.TableRecord> tableVersionMap = Maps.newHashMap();
public BinlogTombstone(long dbId, List<Long> tableIds, long commitSeq) {
this.dbBinlogTombstone = true;
public BinlogTombstone(long dbId, boolean isDbTombstone) {
this.dbBinlogTombstone = isDbTombstone;
this.dbId = dbId;
this.tableIds = tableIds;
this.commitSeq = commitSeq;
this.commitSeq = -1;
this.tableIds = Collections.emptyList();
this.tableCommitSeqMap = Maps.newHashMap();
}
public BinlogTombstone(long dbId, long commitSeq) {
public BinlogTombstone(long tableId, long commitSeq) {
this.dbBinlogTombstone = false;
this.dbId = dbId;
this.tableIds = null;
this.commitSeq = commitSeq;
}
public BinlogTombstone(long dbId, long tableId, long commitSeq) {
this.dbBinlogTombstone = false;
this.dbId = dbId;
this.tableIds = Collections.singletonList(tableId);
this.dbId = -1;
this.commitSeq = commitSeq;
this.tableIds = Collections.emptyList();
this.tableCommitSeqMap = Collections.singletonMap(tableId, commitSeq);
}
public void addTableRecord(long tableId, UpsertRecord upsertRecord) {
@ -75,6 +76,14 @@ public class BinlogTombstone {
tableVersionMap.putAll(records);
}
// Can only be used to merge tombstone of the same db
public void mergeTableTombstone(BinlogTombstone tombstone) {
if (commitSeq < tombstone.getCommitSeq()) {
commitSeq = tombstone.getCommitSeq();
}
tableCommitSeqMap.putAll(tombstone.getTableCommitSeqMap());
}
public boolean isDbBinlogTomstone() {
return dbBinlogTombstone;
}
@ -83,10 +92,21 @@ public class BinlogTombstone {
return dbId;
}
// TODO(deadlinefen): delete this code later
public List<Long> getTableIds() {
if (tableIds == null) {
tableIds = Collections.emptyList();
}
return tableIds;
}
public Map<Long, Long> getTableCommitSeqMap() {
if (tableCommitSeqMap == null) {
tableCommitSeqMap = Collections.emptyMap();
}
return tableCommitSeqMap;
}
public long getCommitSeq() {
return commitSeq;
}

View File

@ -35,7 +35,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -160,13 +159,6 @@ public class DBBinlog {
} finally {
lock.writeLock().unlock();
}
lock.readLock().lock();
try {
LOG.info("[deadlinefen] after add, db {} binlogs: {}, dummys: {}", dbId, allBinlogs, tableDummyBinlogs);
} finally {
lock.readLock().unlock();
}
}
public long getDbId() {
@ -234,28 +226,26 @@ public class DBBinlog {
return tombstone;
}
private BinlogTombstone collectTableTombstone(List<BinlogTombstone> tableTombstones) {
private BinlogTombstone collectTableTombstone(List<BinlogTombstone> tableTombstones, boolean isDbGc) {
if (tableTombstones.isEmpty()) {
return null;
}
List<Long> tableIds = Lists.newArrayList();
long largestExpiredCommitSeq = -1;
BinlogTombstone dbTombstone = new BinlogTombstone(dbId, tableIds, -1);
BinlogTombstone dbTombstone = new BinlogTombstone(dbId, isDbGc);
for (BinlogTombstone tableTombstone : tableTombstones) {
long commitSeq = tableTombstone.getCommitSeq();
if (largestExpiredCommitSeq < commitSeq) {
largestExpiredCommitSeq = commitSeq;
}
// collect tableCommitSeq
dbTombstone.mergeTableTombstone(tableTombstone);
// collect tableVersionMap
Map<Long, UpsertRecord.TableRecord> tableVersionMap = tableTombstone.getTableVersionMap();
if (tableVersionMap.size() > 1) {
LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap);
}
tableIds.addAll(tableTombstone.getTableIds());
dbTombstone.addTableRecord(tableVersionMap);
}
dbTombstone.setCommitSeq(largestExpiredCommitSeq);
LOG.info("After GC, dbId: {}, dbExpiredBinlog: {}, tableExpiredBinlogs: {}",
dbId, dbTombstone.getCommitSeq(), dbTombstone.getTableCommitSeqMap());
return dbTombstone;
}
@ -277,17 +267,9 @@ public class DBBinlog {
tombstones.add(tombstone);
}
}
BinlogTombstone tombstone = collectTableTombstone(tombstones);
BinlogTombstone tombstone = collectTableTombstone(tombstones, false);
if (tombstone != null) {
removeExpiredMetaData(tombstone.getCommitSeq());
lock.readLock().lock();
try {
LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}",
dbId, allBinlogs, tombstone.getCommitSeq(), tableDummyBinlogs);
} finally {
lock.readLock().unlock();
}
}
return tombstone;
@ -329,7 +311,7 @@ public class DBBinlog {
private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
// step 1: get current tableBinlog info and expiredCommitSeq
long expiredCommitSeq = -1;
lock.readLock().lock();
lock.writeLock().lock();
try {
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
@ -355,7 +337,7 @@ public class DBBinlog {
}
}
} finally {
lock.readLock().unlock();
lock.writeLock().unlock();
}
if (expiredCommitSeq == -1) {
@ -372,15 +354,7 @@ public class DBBinlog {
}
}
lock.readLock().lock();
try {
LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}",
dbId, allBinlogs, expiredCommitSeq, tableDummyBinlogs);
} finally {
lock.readLock().unlock();
}
return collectTableTombstone(tableTombstones);
return collectTableTombstone(tableTombstones, true);
}
public void replayGc(BinlogTombstone tombstone) {
@ -407,11 +381,14 @@ public class DBBinlog {
}
}
Iterator<TBinlog> binlogIterator = allBinlogs.iterator();
while (binlogIterator.hasNext()) {
TBinlog binlog = binlogIterator.next();
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
dummy.setCommitSeq(largestExpiredCommitSeq);
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
binlogIterator.remove();
binlogIter.remove();
} else {
break;
}
@ -426,22 +403,33 @@ public class DBBinlog {
public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
List<TableBinlog> tableBinlogs;
lock.writeLock().lock();
lock.readLock().lock();
try {
tableBinlogs = Lists.newArrayList(tableBinlogMap.values());
} finally {
lock.writeLock().unlock();
lock.readLock().unlock();
}
if (tableBinlogs.isEmpty()) {
return;
}
Set<Long> tableIds = Sets.newHashSet(tombstone.getTableIds());
long largestExpiredCommitSeq = tombstone.getCommitSeq();
Map<Long, Long> tableCommitSeqMap = tombstone.getTableCommitSeqMap();
// TODO(deadlinefen): delete this code
// This is a reserved code for the transition between new and old versions.
// It will be deleted later
if (tableCommitSeqMap.isEmpty()) {
long commitSeq = tombstone.getCommitSeq();
List<Long> tableIds = tombstone.getTableIds();
for (long tableId : tableIds) {
tableCommitSeqMap.put(tableId, commitSeq);
}
}
for (TableBinlog tableBinlog : tableBinlogs) {
if (tableIds.contains(tableBinlog.getTableId())) {
tableBinlog.replayGc(largestExpiredCommitSeq);
long tableId = tableBinlog.getTableId();
if (tableCommitSeqMap.containsKey(tableId)) {
tableBinlog.replayGc(tableCommitSeqMap.get(tableId));
}
}
}

View File

@ -77,7 +77,6 @@ public class TableBinlog {
try {
binlogs.add(binlog);
++binlog.table_ref;
LOG.info("[deadlinefen] after add, table {} binlogs: {}", tableId, binlogs);
} finally {
lock.writeLock().unlock();
}
@ -154,20 +153,12 @@ public class TableBinlog {
TBinlog lastUpsertBinlog = tombstoneInfo.first;
long largestCommitSeq = tombstoneInfo.second;
BinlogTombstone tombstone = new BinlogTombstone(-1, largestCommitSeq);
BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq);
if (lastUpsertBinlog != null) {
UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData());
tombstone.addTableRecord(tableId, upsertRecord);
}
lock.readLock().lock();
try {
LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}",
tableId, binlogs, tombstone.getCommitSeq());
} finally {
lock.readLock().unlock();
}
return tombstone;
}
@ -191,7 +182,6 @@ public class TableBinlog {
return null;
}
long dbId = db.getId();
long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
@ -217,36 +207,37 @@ public class TableBinlog {
TBinlog lastUpsertBinlog = tombstoneInfo.first;
long largestCommitSeq = tombstoneInfo.second;
BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, largestCommitSeq);
BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq);
if (lastUpsertBinlog != null) {
UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData());
tombstone.addTableRecord(tableId, upsertRecord);
}
lock.readLock().lock();
try {
LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}",
tableId, binlogs, tombstone.getCommitSeq());
} finally {
lock.readLock().unlock();
}
return tombstone;
}
public void replayGc(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
long lastSeq = -1;
Iterator<TBinlog> iter = binlogs.iterator();
TBinlog dummyBinlog = iter.next();
while (iter.hasNext()) {
TBinlog binlog = iter.next();
if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
long commitSeq = binlog.getCommitSeq();
if (commitSeq <= largestExpiredCommitSeq) {
lastSeq = commitSeq;
--binlog.table_ref;
iter.remove();
} else {
break;
}
}
if (lastSeq != -1) {
dummyBinlog.setCommitSeq(lastSeq);
}
} finally {
lock.writeLock().unlock();
}