[feature](binlog) Support gc binlogs by history nums and size (#35250)

* [chore](binlog) Add logs about binlog gc (#34359)

* [feature](binlog) Support gc binlogs by history nums and size (#34888)
This commit is contained in:
walter
2024-05-23 14:39:57 +08:00
committed by GitHub
parent 0b440685d9
commit acf741fa80
9 changed files with 180 additions and 100 deletions

View File

@ -20,5 +20,5 @@ package org.apache.doris.binlog;
import org.apache.doris.thrift.TBinlog;
public interface BinlogComparator {
boolean isExpired(TBinlog binlog, long expired);
boolean isExpired(TBinlog binlog);
}

View File

@ -41,6 +41,8 @@ public class BinlogConfigCache {
lock = new ReentrantReadWriteLock();
}
// Get the binlog config of the specified db, return null if no such database
// exists.
public BinlogConfig getDBBinlogConfig(long dbId) {
lock.readLock().lock();
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
@ -110,7 +112,8 @@ public class BinlogConfigCache {
OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone
// binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {

View File

@ -58,7 +58,7 @@ public class BinlogGcer extends MasterDaemon {
try {
List<BinlogTombstone> tombstones = Env.getCurrentEnv().getBinlogManager().gc();
if (tombstones != null && !tombstones.isEmpty()) {
LOG.info("tomebstones size: {}", tombstones.size());
LOG.info("tombstones size: {}", tombstones.size());
} else {
LOG.info("no gc binlog");
return;

View File

@ -58,9 +58,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class BinlogManager {
private static final int BUFFER_SIZE = 16 * 1024;
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name")
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("BinlogSize").add("FirstBinlogCommittedTime")
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds").add("BinlogMaxBytes")
.add("BinlogMaxHistoryNums")
.build();
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);

View File

@ -81,6 +81,7 @@ public class BinlogUtils {
return dummy;
}
// Compute the expired timestamp in milliseconds.
public static long getExpiredMs(long ttlSeconds) {
long currentSeconds = System.currentTimeMillis() / 1000;
if (currentSeconds < ttlSeconds) {
@ -94,4 +95,11 @@ public class BinlogUtils {
public static String convertTimeToReadable(long time) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time));
}
public static long getApproximateMemoryUsage(TBinlog binlog) {
/* object layout: header + body + padding */
final long objSize = 80; // 9 fields and 1 header
String data = binlog.getData();
return objSize + binlog.getTableIdsSize() * 8 + (data == null ? 0 : data.length());
}
}

View File

@ -45,6 +45,8 @@ public class DBBinlog {
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
private long dbId;
// The size of all binlogs.
private long binlogSize;
// guard for allBinlogs && tableBinlogMap
private ReentrantReadWriteLock lock;
// all binlogs contain table binlogs && create table binlog etc ...
@ -64,6 +66,7 @@ public class DBBinlog {
lock = new ReentrantReadWriteLock();
this.dbId = binlog.getDbId();
this.binlogConfigCache = binlogConfigCache;
this.binlogSize = 0;
// allBinlogs treeset order by commitSeq
allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
@ -81,7 +84,7 @@ public class DBBinlog {
}
public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy,
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
long dbId = dbDummy.getDbId();
for (TBinlog tableDummy : tableDummies) {
@ -105,6 +108,7 @@ public class DBBinlog {
}
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (tableIds == null) {
return;
@ -119,12 +123,13 @@ public class DBBinlog {
}
}
// TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery
// TODO(Drogon): remove TableBinlog after DropTable, think table drop &&
// recovery
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {
if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) {
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
tableBinlogMap.put(tableId, tableBinlog);
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
}
@ -132,7 +137,8 @@ public class DBBinlog {
return tableBinlog;
}
// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog
// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable
// binlog
public void addBinlog(TBinlog binlog) {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();
@ -140,6 +146,7 @@ public class DBBinlog {
lock.writeLock().lock();
try {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
@ -226,14 +233,10 @@ public class DBBinlog {
return null;
}
boolean dbBinlogEnable = dbBinlogConfig.isEnable();
BinlogTombstone tombstone;
if (dbBinlogEnable) {
if (dbBinlogConfig.isEnable()) {
// db binlog is enabled, only one binlogTombstones
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
tombstone = dbBinlogEnableGc(expiredMs);
tombstone = dbBinlogEnableGc(dbBinlogConfig);
} else {
tombstone = dbBinlogDisableGc();
}
@ -277,7 +280,7 @@ public class DBBinlog {
}
for (TableBinlog tableBinlog : tableBinlogs) {
BinlogTombstone tombstone = tableBinlog.ttlGc();
BinlogTombstone tombstone = tableBinlog.gc();
if (tombstone != null) {
tombstones.add(tombstone);
}
@ -297,6 +300,7 @@ public class DBBinlog {
TBinlog dummy = binlogIter.next();
boolean foundFirstUsingBinlog = false;
long lastCommitSeq = -1;
long removed = 0;
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
@ -304,6 +308,8 @@ public class DBBinlog {
if (commitSeq <= largestExpiredCommitSeq) {
if (binlog.table_ref <= 0) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
++removed;
if (!foundFirstUsingBinlog) {
lastCommitSeq = commitSeq;
}
@ -318,52 +324,92 @@ public class DBBinlog {
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
LOG.info("remove {} expired binlogs, dbId: {}, left: {}", removed, dbId, allBinlogs.size());
} finally {
lock.writeLock().unlock();
}
}
private BinlogTombstone dbBinlogEnableGc(long expiredMs) {
private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
TBinlog lastExpiredBinlog = null;
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (checker.isExpired(binlog)) {
binlogIter.remove();
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
lastExpiredBinlog = binlog;
} else {
break;
}
}
if (lastExpiredBinlog != null) {
dummy.setCommitSeq(lastExpiredBinlog.getCommitSeq());
// release expired timestamps by commit seq.
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) {
timeIter.remove();
}
}
return lastExpiredBinlog;
}
private BinlogTombstone dbBinlogEnableGc(BinlogConfig dbBinlogConfig) {
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
long maxBytes = dbBinlogConfig.getMaxBytes();
long maxHistoryNums = dbBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
LOG.info("gc db binlog. dbId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, maxHistoryNums: {}",
dbId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums);
// step 1: get current tableBinlog info and expiredCommitSeq
long expiredCommitSeq = -1;
TBinlog lastExpiredBinlog = null;
lock.writeLock().lock();
try {
long expiredCommitSeq = -1;
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
Pair<Long, Long> pair = timeIter.next();
if (pair.second <= expiredMs) {
expiredCommitSeq = pair.first;
timeIter.remove();
} else {
if (pair.second > expiredMs) {
break;
}
expiredCommitSeq = pair.first;
}
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
dummy.setCommitSeq(expiredCommitSeq);
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (binlog.getCommitSeq() <= expiredCommitSeq) {
binlogIter.remove();
} else {
break;
}
}
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator checker = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
//
// The expired conditions in order:
// 1. expired time
// 2. the max bytes
// 3. the max history num
return binlog.getCommitSeq() <= lastExpiredCommitSeq
|| maxBytes < binlogSize
|| maxHistoryNums < allBinlogs.size();
};
lastExpiredBinlog = getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
if (expiredCommitSeq == -1) {
if (lastExpiredBinlog == null) {
return null;
}
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db tombstone
// step 2: gc every tableBinlog in dbBinlog, get table tombstone to complete db
// tombstone
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
// step 2.1: gc tableBinlog,and get table tombstone
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq);
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(lastExpiredBinlog.getCommitSeq());
if (tableTombstone != null) {
tableTombstones.add(tableTombstone);
}
@ -386,28 +432,8 @@ public class DBBinlog {
lock.writeLock().lock();
try {
Iterator<Pair<Long, Long>> timeIter = timestamps.iterator();
while (timeIter.hasNext()) {
long commitSeq = timeIter.next().first;
if (commitSeq <= largestExpiredCommitSeq) {
timeIter.remove();
} else {
break;
}
}
Iterator<TBinlog> binlogIter = allBinlogs.iterator();
TBinlog dummy = binlogIter.next();
dummy.setCommitSeq(largestExpiredCommitSeq);
while (binlogIter.hasNext()) {
TBinlog binlog = binlogIter.next();
if (binlog.getCommitSeq() <= largestExpiredCommitSeq) {
binlogIter.remove();
} else {
break;
}
}
BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq;
getLastExpiredBinlog(checker);
} finally {
lock.writeLock().unlock();
}
@ -478,6 +504,8 @@ public class DBBinlog {
info.add(dropped);
String binlogLength = String.valueOf(allBinlogs.size());
info.add(binlogLength);
String binlogSize = String.valueOf(this.binlogSize);
info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
@ -497,10 +525,16 @@ public class DBBinlog {
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
String binlogMaxBytes = null;
String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
info.add(binlogMaxBytes);
info.add(binlogMaxHistoryNums);
result.addRow(info);
} else {

View File

@ -27,6 +27,7 @@ import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -43,15 +44,23 @@ public class TableBinlog {
private long dbId;
private long tableId;
private long binlogSize;
private ReentrantReadWriteLock lock;
private TreeSet<TBinlog> binlogs;
// Pair(commitSeq, timestamp), used for gc
// need UpsertRecord to add timestamps for gc
private List<Pair<Long, Long>> timestamps;
private BinlogConfigCache binlogConfigCache;
public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) {
this.dbId = dbId;
this.tableId = tableId;
this.binlogSize = 0;
lock = new ReentrantReadWriteLock();
binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
timestamps = Lists.newArrayList();
TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
@ -77,6 +86,10 @@ public class TableBinlog {
if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
}
}
@ -85,6 +98,10 @@ public class TableBinlog {
try {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
} finally {
lock.writeLock().unlock();
}
@ -108,7 +125,7 @@ public class TableBinlog {
}
}
private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator checker) {
private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(BinlogComparator checker) {
if (binlogs.size() <= 1) {
return null;
}
@ -119,9 +136,10 @@ public class TableBinlog {
TBinlog lastExpiredBinlog = null;
while (iter.hasNext()) {
TBinlog binlog = iter.next();
if (checker.isExpired(binlog, expired)) {
if (checker.isExpired(binlog)) {
lastExpiredBinlog = binlog;
--binlog.table_ref;
binlogSize -= BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getType() == TBinlogType.UPSERT) {
tombstoneUpsert = binlog;
}
@ -135,9 +153,15 @@ public class TableBinlog {
return null;
}
dummyBinlog.setCommitSeq(lastExpiredBinlog.getCommitSeq());
long expiredCommitSeq = lastExpiredBinlog.getCommitSeq();
dummyBinlog.setCommitSeq(expiredCommitSeq);
return Pair.of(tombstoneUpsert, lastExpiredBinlog.getCommitSeq());
Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
while (timeIterator.hasNext() && timeIterator.next().first <= expiredCommitSeq) {
timeIterator.remove();
}
return Pair.of(tombstoneUpsert, expiredCommitSeq);
}
// this method call when db binlog enable
@ -147,8 +171,8 @@ public class TableBinlog {
// step 1: get tombstoneUpsertBinlog and dummyBinlog
lock.writeLock().lock();
try {
BinlogComparator check = (binlog, expire) -> binlog.getCommitSeq() <= expire;
tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredCommitSeq, check);
BinlogComparator check = (binlog) -> binlog.getCommitSeq() <= expiredCommitSeq;
tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
@ -171,7 +195,7 @@ public class TableBinlog {
}
// this method call when db binlog disable
public BinlogTombstone ttlGc() {
public BinlogTombstone gc() {
// step 1: get expire time
BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {
@ -179,19 +203,43 @@ public class TableBinlog {
}
long ttlSeconds = tableBinlogConfig.getTtlSeconds();
long maxBytes = tableBinlogConfig.getMaxBytes();
long maxHistoryNums = tableBinlogConfig.getMaxHistoryNums();
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
if (expiredMs < 0) {
return null;
}
LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, tableId, expiredMs);
LOG.info(
"gc table binlog. dbId: {}, tableId: {}, expiredMs: {}, ttlSecond: {}, maxBytes: {}, "
+ "maxHistoryNums: {}, now: {}",
dbId, tableId, expiredMs, ttlSeconds, maxBytes, maxHistoryNums, System.currentTimeMillis());
// step 2: get tombstoneUpsertBinlog and dummyBinlog
Pair<TBinlog, Long> tombstoneInfo;
lock.writeLock().lock();
try {
BinlogComparator check = (binlog, expire) -> binlog.getTimestamp() <= expire;
tombstoneInfo = getLastUpsertAndLargestCommitSeq(expiredMs, check);
// find the last expired commit seq.
long expiredCommitSeq = -1;
Iterator<Pair<Long, Long>> timeIterator = timestamps.iterator();
while (timeIterator.hasNext()) {
Pair<Long, Long> entry = timeIterator.next();
if (expiredMs < entry.second) {
break;
}
expiredCommitSeq = entry.first;
}
final long lastExpiredCommitSeq = expiredCommitSeq;
BinlogComparator check = (binlog) -> {
// NOTE: TreeSet read size during iterator remove is valid.
//
// The expired conditions in order:
// 1. expired time
// 2. the max bytes
// 3. the max history num
return binlog.getCommitSeq() <= lastExpiredCommitSeq
|| maxBytes < binlogSize
|| maxHistoryNums < binlogs.size();
};
tombstoneInfo = getLastUpsertAndLargestCommitSeq(check);
} finally {
lock.writeLock().unlock();
}
@ -216,25 +264,8 @@ public class TableBinlog {
public void replayGc(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
long lastSeq = -1;
Iterator<TBinlog> iter = binlogs.iterator();
TBinlog dummyBinlog = iter.next();
while (iter.hasNext()) {
TBinlog binlog = iter.next();
long commitSeq = binlog.getCommitSeq();
if (commitSeq <= largestExpiredCommitSeq) {
lastSeq = commitSeq;
--binlog.table_ref;
iter.remove();
} else {
break;
}
}
if (lastSeq != -1) {
dummyBinlog.setCommitSeq(lastSeq);
}
BinlogComparator checker = (binlog) -> binlog.getCommitSeq() <= largestExpiredCommitSeq;
getLastUpsertAndLargestCommitSeq(checker);
} finally {
lock.writeLock().unlock();
}
@ -278,6 +309,8 @@ public class TableBinlog {
info.add(dropped);
String binlogLength = String.valueOf(binlogs.size());
info.add(binlogLength);
String binlogSize = String.valueOf(this.binlogSize);
info.add(binlogSize);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
for (TBinlog binlog : binlogs) {
@ -305,10 +338,16 @@ public class TableBinlog {
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
String binlogMaxBytes = null;
String binlogMaxHistoryNums = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
binlogMaxBytes = String.valueOf(binlogConfig.getMaxBytes());
binlogMaxHistoryNums = String.valueOf(binlogConfig.getMaxHistoryNums());
}
info.add(binlogTtlSeconds);
info.add(binlogMaxBytes);
info.add(binlogMaxHistoryNums);
result.addRow(info);
} finally {

View File

@ -277,14 +277,9 @@ public class BinlogManagerTest {
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
long dbId = dbEntry.getKey();
for (long tableId : dbEntry.getValue()) {
if ((tableId / tableBaseId) % 2 != 0) {
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
++commitSeq;
} else {
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
}
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
++commitSeq;
}
}

View File

@ -75,7 +75,7 @@ public class TableBinlogTest {
}
// trigger ttlGc
BinlogTombstone tombstone = tableBinlog.ttlGc();
BinlogTombstone tombstone = tableBinlog.gc();
// check binlog status
for (TBinlog binlog : testBinlogs) {