From 667e4ea99bbbc6c25226123d19790b8539ef989f Mon Sep 17 00:00:00 2001 From: DeadlineFen <117912096+deadlinefen@users.noreply.github.com> Date: Mon, 24 Jul 2023 14:33:16 +0800 Subject: [PATCH] [Fix](binlog) Fix bugs in tombstone (#22031) --- .../apache/doris/binlog/BinlogTombstone.java | 48 +++++++---- .../org/apache/doris/binlog/DBBinlog.java | 84 ++++++++----------- .../org/apache/doris/binlog/TableBinlog.java | 35 +++----- 3 files changed, 83 insertions(+), 84 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java index 50d9f90a01..2b6e3cb8e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogTombstone.java @@ -36,33 +36,34 @@ public class BinlogTombstone { @SerializedName(value = "commitSeq") private long commitSeq; + // TODO(deadlinefen): delete this field later + // This is a reserved field for the transition between new and old versions. + // It will be deleted later @SerializedName(value = "tableIds") private List tableIds; + @SerializedName(value = "tableCommitSeqMap") + private Map tableCommitSeqMap; + @SerializedName(value = "tableVersionMap") // this map keep last upsert record // only for master fe to send be gc task, not need persist private Map tableVersionMap = Maps.newHashMap(); - public BinlogTombstone(long dbId, List tableIds, long commitSeq) { - this.dbBinlogTombstone = true; + public BinlogTombstone(long dbId, boolean isDbTombstone) { + this.dbBinlogTombstone = isDbTombstone; this.dbId = dbId; - this.tableIds = tableIds; - this.commitSeq = commitSeq; + this.commitSeq = -1; + this.tableIds = Collections.emptyList(); + this.tableCommitSeqMap = Maps.newHashMap(); } - public BinlogTombstone(long dbId, long commitSeq) { + public BinlogTombstone(long tableId, long commitSeq) { this.dbBinlogTombstone = false; - this.dbId = dbId; - this.tableIds = null; - this.commitSeq = commitSeq; - } - - public BinlogTombstone(long dbId, long tableId, long commitSeq) { - this.dbBinlogTombstone = false; - this.dbId = dbId; - this.tableIds = Collections.singletonList(tableId); + this.dbId = -1; this.commitSeq = commitSeq; + this.tableIds = Collections.emptyList(); + this.tableCommitSeqMap = Collections.singletonMap(tableId, commitSeq); } public void addTableRecord(long tableId, UpsertRecord upsertRecord) { @@ -75,6 +76,14 @@ public class BinlogTombstone { tableVersionMap.putAll(records); } + // Can only be used to merge tombstone of the same db + public void mergeTableTombstone(BinlogTombstone tombstone) { + if (commitSeq < tombstone.getCommitSeq()) { + commitSeq = tombstone.getCommitSeq(); + } + tableCommitSeqMap.putAll(tombstone.getTableCommitSeqMap()); + } + public boolean isDbBinlogTomstone() { return dbBinlogTombstone; } @@ -83,10 +92,21 @@ public class BinlogTombstone { return dbId; } + // TODO(deadlinefen): delete this code later public List getTableIds() { + if (tableIds == null) { + tableIds = Collections.emptyList(); + } return tableIds; } + public Map getTableCommitSeqMap() { + if (tableCommitSeqMap == null) { + tableCommitSeqMap = Collections.emptyMap(); + } + return tableCommitSeqMap; + } + public long getCommitSeq() { return commitSeq; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 58708c8fe6..151c5e5be9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -35,7 +35,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -160,13 +159,6 @@ public class DBBinlog { } finally { lock.writeLock().unlock(); } - - lock.readLock().lock(); - try { - LOG.info("[deadlinefen] after add, db {} binlogs: {}, dummys: {}", dbId, allBinlogs, tableDummyBinlogs); - } finally { - lock.readLock().unlock(); - } } public long getDbId() { @@ -234,28 +226,26 @@ public class DBBinlog { return tombstone; } - private BinlogTombstone collectTableTombstone(List tableTombstones) { + private BinlogTombstone collectTableTombstone(List tableTombstones, boolean isDbGc) { if (tableTombstones.isEmpty()) { return null; } - List tableIds = Lists.newArrayList(); - long largestExpiredCommitSeq = -1; - BinlogTombstone dbTombstone = new BinlogTombstone(dbId, tableIds, -1); + BinlogTombstone dbTombstone = new BinlogTombstone(dbId, isDbGc); for (BinlogTombstone tableTombstone : tableTombstones) { - long commitSeq = tableTombstone.getCommitSeq(); - if (largestExpiredCommitSeq < commitSeq) { - largestExpiredCommitSeq = commitSeq; - } + // collect tableCommitSeq + dbTombstone.mergeTableTombstone(tableTombstone); + + // collect tableVersionMap Map tableVersionMap = tableTombstone.getTableVersionMap(); if (tableVersionMap.size() > 1) { LOG.warn("tableVersionMap size is greater than 1. tableVersionMap: {}", tableVersionMap); } - tableIds.addAll(tableTombstone.getTableIds()); dbTombstone.addTableRecord(tableVersionMap); } - dbTombstone.setCommitSeq(largestExpiredCommitSeq); + LOG.info("After GC, dbId: {}, dbExpiredBinlog: {}, tableExpiredBinlogs: {}", + dbId, dbTombstone.getCommitSeq(), dbTombstone.getTableCommitSeqMap()); return dbTombstone; } @@ -277,17 +267,9 @@ public class DBBinlog { tombstones.add(tombstone); } } - BinlogTombstone tombstone = collectTableTombstone(tombstones); + BinlogTombstone tombstone = collectTableTombstone(tombstones, false); if (tombstone != null) { removeExpiredMetaData(tombstone.getCommitSeq()); - - lock.readLock().lock(); - try { - LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}", - dbId, allBinlogs, tombstone.getCommitSeq(), tableDummyBinlogs); - } finally { - lock.readLock().unlock(); - } } return tombstone; @@ -329,7 +311,7 @@ public class DBBinlog { private BinlogTombstone dbBinlogEnableGc(long expiredMs) { // step 1: get current tableBinlog info and expiredCommitSeq long expiredCommitSeq = -1; - lock.readLock().lock(); + lock.writeLock().lock(); try { Iterator> timeIter = timestamps.iterator(); while (timeIter.hasNext()) { @@ -355,7 +337,7 @@ public class DBBinlog { } } } finally { - lock.readLock().unlock(); + lock.writeLock().unlock(); } if (expiredCommitSeq == -1) { @@ -372,15 +354,7 @@ public class DBBinlog { } } - lock.readLock().lock(); - try { - LOG.info("[deadlinefen] after gc, db {} binlogs: {}, tombstone.seq: {}, dummys: {}", - dbId, allBinlogs, expiredCommitSeq, tableDummyBinlogs); - } finally { - lock.readLock().unlock(); - } - - return collectTableTombstone(tableTombstones); + return collectTableTombstone(tableTombstones, true); } public void replayGc(BinlogTombstone tombstone) { @@ -407,11 +381,14 @@ public class DBBinlog { } } - Iterator binlogIterator = allBinlogs.iterator(); - while (binlogIterator.hasNext()) { - TBinlog binlog = binlogIterator.next(); + Iterator binlogIter = allBinlogs.iterator(); + TBinlog dummy = binlogIter.next(); + dummy.setCommitSeq(largestExpiredCommitSeq); + + while (binlogIter.hasNext()) { + TBinlog binlog = binlogIter.next(); if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { - binlogIterator.remove(); + binlogIter.remove(); } else { break; } @@ -426,22 +403,33 @@ public class DBBinlog { public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) { List tableBinlogs; - lock.writeLock().lock(); + lock.readLock().lock(); try { tableBinlogs = Lists.newArrayList(tableBinlogMap.values()); } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } if (tableBinlogs.isEmpty()) { return; } - Set tableIds = Sets.newHashSet(tombstone.getTableIds()); - long largestExpiredCommitSeq = tombstone.getCommitSeq(); + Map tableCommitSeqMap = tombstone.getTableCommitSeqMap(); + // TODO(deadlinefen): delete this code + // This is a reserved code for the transition between new and old versions. + // It will be deleted later + if (tableCommitSeqMap.isEmpty()) { + long commitSeq = tombstone.getCommitSeq(); + List tableIds = tombstone.getTableIds(); + for (long tableId : tableIds) { + tableCommitSeqMap.put(tableId, commitSeq); + } + } + for (TableBinlog tableBinlog : tableBinlogs) { - if (tableIds.contains(tableBinlog.getTableId())) { - tableBinlog.replayGc(largestExpiredCommitSeq); + long tableId = tableBinlog.getTableId(); + if (tableCommitSeqMap.containsKey(tableId)) { + tableBinlog.replayGc(tableCommitSeqMap.get(tableId)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 9b82272f63..8934084e99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -77,7 +77,6 @@ public class TableBinlog { try { binlogs.add(binlog); ++binlog.table_ref; - LOG.info("[deadlinefen] after add, table {} binlogs: {}", tableId, binlogs); } finally { lock.writeLock().unlock(); } @@ -154,20 +153,12 @@ public class TableBinlog { TBinlog lastUpsertBinlog = tombstoneInfo.first; long largestCommitSeq = tombstoneInfo.second; - BinlogTombstone tombstone = new BinlogTombstone(-1, largestCommitSeq); + BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq); if (lastUpsertBinlog != null) { UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData()); tombstone.addTableRecord(tableId, upsertRecord); } - lock.readLock().lock(); - try { - LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}", - tableId, binlogs, tombstone.getCommitSeq()); - } finally { - lock.readLock().unlock(); - } - return tombstone; } @@ -191,7 +182,6 @@ public class TableBinlog { return null; } - long dbId = db.getId(); long ttlSeconds = table.getBinlogConfig().getTtlSeconds(); long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds); @@ -217,36 +207,37 @@ public class TableBinlog { TBinlog lastUpsertBinlog = tombstoneInfo.first; long largestCommitSeq = tombstoneInfo.second; - BinlogTombstone tombstone = new BinlogTombstone(dbId, tableId, largestCommitSeq); + BinlogTombstone tombstone = new BinlogTombstone(tableId, largestCommitSeq); if (lastUpsertBinlog != null) { UpsertRecord upsertRecord = UpsertRecord.fromJson(lastUpsertBinlog.getData()); tombstone.addTableRecord(tableId, upsertRecord); } - lock.readLock().lock(); - try { - LOG.info("[deadlinefen] after gc, table {} binlogs: {}, tombstone.seq: {}", - tableId, binlogs, tombstone.getCommitSeq()); - } finally { - lock.readLock().unlock(); - } - - return tombstone; } public void replayGc(long largestExpiredCommitSeq) { lock.writeLock().lock(); try { + long lastSeq = -1; Iterator iter = binlogs.iterator(); + TBinlog dummyBinlog = iter.next(); + while (iter.hasNext()) { TBinlog binlog = iter.next(); - if (binlog.getCommitSeq() <= largestExpiredCommitSeq) { + long commitSeq = binlog.getCommitSeq(); + if (commitSeq <= largestExpiredCommitSeq) { + lastSeq = commitSeq; + --binlog.table_ref; iter.remove(); } else { break; } } + + if (lastSeq != -1) { + dummyBinlog.setCommitSeq(lastSeq); + } } finally { lock.writeLock().unlock(); }