diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java new file mode 100644 index 0000000000..64a539a2b8 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogManagerTest.java @@ -0,0 +1,382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.persist.BinlogGcInfo; +import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; +import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.Maps; +import mockit.Mock; +import mockit.MockUp; +import org.apache.hadoop.util.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; + +public class BinlogManagerTest { + private Map> frameWork; + + private int dbNum = 2; + private int tableNumPerDb = 3; + + private long dbBaseId = 10000; + private long tableBaseId = 100; + private long baseNum = 10000; + private long timeNow = baseNum; + private long ttl = 3; + + private boolean enableDbBinlog = false; + + @BeforeClass + public static void beforeClass() { + Config.enable_feature_binlog = true; + } + + @Before + public void setUp() { + Assert.assertTrue(tableNumPerDb < 100); + frameWork = Maps.newHashMap(); + for (int dbOff = 1; dbOff <= dbNum; ++dbOff) { + long dbId = dbOff * dbBaseId; + List tableIds = Lists.newArrayList(); + for (int tblOff = 1; tblOff <= tableNumPerDb; ++tblOff) { + tableIds.add(tableBaseId * tblOff + dbId); + } + frameWork.put(dbId, tableIds); + } + + new MockUp() { + @Mock + public BinlogConfig getDBBinlogConfig(long dbId) { + return new BinlogConfig(); + } + + @Mock + public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { + return new BinlogConfig(); + } + + @Mock + public boolean isEnableTable(long dbId, long tableId) { + return true; + } + + @Mock + public boolean isEnableDB(long dbId) { + return enableDbBinlog; + } + }; + + new MockUp() { + @Mock + public long getTtlSeconds() { + return ttl; + } + + @Mock + public boolean isEnable() { + return enableDbBinlog; + } + }; + + new MockUp() { + @Mock + public InternalCatalog getCurrentInternalCatalog() { + return new InternalCatalog(); + } + }; + + new MockUp() { + @Mock + public Database getDbNullable(long dbId) { + return new Database(); + } + }; + + new MockUp() { + @Mock + public BinlogConfig getBinlogConfig() { + return new BinlogConfig(); + } + }; + } + + @Test + public void testGetBinlog() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + // reflect BinlogManager + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + addBinlog.setAccessible(true); + + // init binlog manager & addBinlog + BinlogManager manager = new BinlogManager(); + + // insert table binlogs + int binlogNum = 10; + for (int i = 1; i <= binlogNum; ++i) { + TBinlog binlog = BinlogTestUtils.newBinlog(dbBaseId, tableBaseId, i, i); + if (i % 2 == 0) { + binlog.setType(TBinlogType.CREATE_TABLE); + } + addBinlog.invoke(manager, binlog); + + } + + // test get + Pair pair; + + // get too old + pair = manager.getBinlog(dbBaseId, tableBaseId, -99); + Assert.assertEquals(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ, pair.first.getStatusCode()); + Assert.assertEquals(TBinlogType.DUMMY, pair.second.getType()); + + // get odd commit seq in table level ok + pair = manager.getBinlog(dbBaseId, tableBaseId, 5); + Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode()); + Assert.assertEquals(5 + 2, pair.second.getCommitSeq()); + + // get even commit seq in table level ok + pair = manager.getBinlog(dbBaseId, tableBaseId, 6); + Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode()); + Assert.assertEquals(6 + 1, pair.second.getCommitSeq()); + + // get odd commit seq in db level ok + pair = manager.getBinlog(dbBaseId, -1, 5); + Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode()); + Assert.assertEquals(5 + 1, pair.second.getCommitSeq()); + + // get even commit seq in db level ok + pair = manager.getBinlog(dbBaseId, -1, 6); + Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode()); + Assert.assertEquals(6 + 1, pair.second.getCommitSeq()); + + // get too new + pair = manager.getBinlog(dbBaseId, tableBaseId, 999); + Assert.assertEquals(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ, pair.first.getStatusCode()); + Assert.assertNull(pair.second); + } + + @Test + public void testPersist() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, + IOException, NoSuchFieldException { + // reflect BinlogManager + // addBinlog method + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + addBinlog.setAccessible(true); + // dbBinlogMap + Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); + dbBinlogMapField.setAccessible(true); + + // init binlog manager & addBinlog + BinlogManager originManager = new BinlogManager(); + + // insert binlogs + long commitSeq = baseNum; + for (Map.Entry> dbEntry : frameWork.entrySet()) { + long dbId = dbEntry.getKey(); + for (long tableId : dbEntry.getValue()) { + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); + ++commitSeq; + } + } + + // init output stream + ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(arrayOutputStream); + + // serialize binlogs + originManager.write(outputStream, 0L); + + // init another binlog manager + BinlogManager newManager = new BinlogManager(); + + // deserialize binlogs + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray()); + DataInputStream inputStream = new DataInputStream(arrayInputStream); + newManager.read(inputStream, 0L); + + // get origin & new dbbinlog's allbinlogs + Map originDbBinlogMap = (Map) dbBinlogMapField.get(originManager); + Map newDbBinlogMap = (Map) dbBinlogMapField.get(newManager); + Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size()); + for (long dbId : frameWork.keySet()) { + List originBinlogList = Lists.newArrayList(); + List newBinlogList = Lists.newArrayList(); + originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList); + newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList); + Assert.assertEquals(originBinlogList.size(), newBinlogList.size()); + for (int i = 0; i < originBinlogList.size(); ++i) { + Assert.assertEquals(originBinlogList.get(i).getCommitSeq(), + newBinlogList.get(i).getCommitSeq()); + } + } + } + + @Test + public void testReplayGcFromTableLevel() throws NoSuchMethodException, InvocationTargetException, + IllegalAccessException, NoSuchFieldException { + // MockUp + new MockUp() { + @Mock + public long getExpiredMs(long ttl) { + return timeNow - ttl; + } + }; + + // reflect BinlogManager + // addBinlog method + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + addBinlog.setAccessible(true); + // dbBinlogMap + Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); + dbBinlogMapField.setAccessible(true); + + // init binlog origin & new manager + BinlogManager originManager = new BinlogManager(); + BinlogManager newManager = new BinlogManager(); + + // insert binlogs + long commitSeq = 0; + for (Map.Entry> dbEntry : frameWork.entrySet()) { + long dbId = dbEntry.getKey(); + for (long tableId : dbEntry.getValue()) { + if ((tableId / tableBaseId) % 2 != 0) { + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow)); + ++commitSeq; + } else { + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0)); + } + } + } + + // origin manager gc & get BinlogGcInfo + BinlogGcInfo info = new BinlogGcInfo(originManager.gc()); + + // new manager replay gc + newManager.replayGc(info); + + // get origin & new dbbinlog's allbinlogs + Map originDbBinlogMap = (Map) dbBinlogMapField.get(originManager); + Map newDbBinlogMap = (Map) dbBinlogMapField.get(newManager); + Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size()); + for (long dbId : frameWork.keySet()) { + List originBinlogList = Lists.newArrayList(); + List newBinlogList = Lists.newArrayList(); + originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList); + newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList); + Assert.assertEquals(originBinlogList.size(), newBinlogList.size()); + for (int i = 0; i < originBinlogList.size(); ++i) { + TBinlog originBinlog = originBinlogList.get(i); + TBinlog newBinlog = newBinlogList.get(i); + Assert.assertEquals(originBinlog.getCommitSeq(), newBinlog.getCommitSeq()); + if (newBinlog.getType() != TBinlogType.DUMMY) { + Assert.assertTrue(newBinlog.getTimestamp() > timeNow - ttl); + } + } + } + } + + @Test + public void testReplayGcFromDbLevel() throws NoSuchMethodException, InvocationTargetException, + IllegalAccessException, NoSuchFieldException { + // MockUp + new MockUp() { + @Mock + public long getExpiredMs(long ttl) { + return timeNow - ttl; + } + }; + + // set dbBinlogEnable + enableDbBinlog = true; + + // reflect BinlogManager + // addBinlog method + Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class); + addBinlog.setAccessible(true); + // dbBinlogMap + Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap"); + dbBinlogMapField.setAccessible(true); + + // init binlog origin & new manager + BinlogManager originManager = new BinlogManager(); + BinlogManager newManager = new BinlogManager(); + + // insert binlogs + long commitSeq = baseNum; + for (Map.Entry> dbEntry : frameWork.entrySet()) { + long dbId = dbEntry.getKey(); + for (long tableId : dbEntry.getValue()) { + ++commitSeq; + addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); + addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq)); + } + } + timeNow = commitSeq; + + // origin manager gc & get BinlogGcInfo + BinlogGcInfo info = new BinlogGcInfo(originManager.gc()); + + // new manager replay gc + newManager.replayGc(info); + + // get origin & new dbbinlog's allbinlogs + Map originDbBinlogMap = (Map) dbBinlogMapField.get(originManager); + Map newDbBinlogMap = (Map) dbBinlogMapField.get(newManager); + Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size()); + for (Map.Entry> dbEntry : frameWork.entrySet()) { + long dbId = dbEntry.getKey(); + List originBinlogList = Lists.newArrayList(); + List newBinlogList = Lists.newArrayList(); + originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList); + newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList); + Assert.assertEquals(originBinlogList.size(), newBinlogList.size()); + for (int i = 0; i < originBinlogList.size(); ++i) { + TBinlog originBinlog = originBinlogList.get(i); + TBinlog newBinlog = newBinlogList.get(i); + Assert.assertEquals(originBinlog.getCommitSeq(), newBinlog.getCommitSeq()); + if (newBinlog.getType() != TBinlogType.DUMMY) { + Assert.assertTrue(newBinlog.getCommitSeq() > timeNow - ttl); + } + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogTestUtils.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogTestUtils.java new file mode 100644 index 0000000000..af5eabfa3d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/BinlogTestUtils.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; + +import com.google.common.collect.Maps; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class BinlogTestUtils { + + public static final long MAX_BYTES = 0x7fffffffffffffffL; + public static final long MAX_HISTORY_NUMS = 0x7fffffffffffffffL; + + public static BinlogConfig newTestBinlogConfig(boolean enableBinlog, long expiredTime) { + return new BinlogConfig(enableBinlog, expiredTime, MAX_BYTES, MAX_HISTORY_NUMS); + } + + public static BinlogConfigCache newMockBinlogConfigCache(long dbId, long tableId, long expiredTime) { + BinlogConfig binlogConfig = newTestBinlogConfig(true, expiredTime); + return new MockBinlogConfigCache( + Collections.singletonMap(String.format("%d_%d", dbId, tableId), binlogConfig)); + } + + public static MockBinlogConfigCache newMockBinlogConfigCache(Map ttlMap) { + Map configMap = Maps.newHashMap(); + for (Map.Entry entry : ttlMap.entrySet()) { + configMap.put(entry.getKey(), newTestBinlogConfig(true, entry.getValue())); + } + return new MockBinlogConfigCache(configMap); + } + + public static TBinlog newBinlog(long dbId, long tableId, long commitSeq, long timestamp) { + TBinlog binlog = new TBinlog(); + binlog.setDbId(dbId); + binlog.setTableIds(Collections.singletonList(tableId)); + binlog.setType(TBinlogType.ALTER_JOB); + binlog.setCommitSeq(commitSeq); + binlog.setTimestamp(timestamp); + binlog.setTableRef(0); + binlog.setBelong(-1); + return binlog; + } + + public static TBinlog newBinlog(long dbId, List tableIds, long commitSeq, long timestamp) { + TBinlog binlog = new TBinlog(); + binlog.setDbId(dbId); + binlog.setTableIds(tableIds); + binlog.setType(TBinlogType.ALTER_JOB); + binlog.setCommitSeq(commitSeq); + binlog.setTimestamp(timestamp); + binlog.setTableRef(0); + binlog.setBelong(-1); + return binlog; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java new file mode 100644 index 0000000000..b57bde598e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/DbBinlogTest.java @@ -0,0 +1,307 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.thrift.TBinlog; +import org.apache.doris.thrift.TBinlogType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +public class DbBinlogTest { + private long dbId = 10000L; + private long baseTableId = 20000L; + private int tableNum = 5; + private int gcTableNum = 2; + private List tableIds; + + private int totalBinlogNum = 10; + private int expiredBinlogNum = 3; + private long baseNum = 30000L; + + @Before + public void setUp() { + // check args valid + Assert.assertTrue(totalBinlogNum > 0); + Assert.assertTrue(gcTableNum <= tableNum); + Assert.assertTrue(expiredBinlogNum <= totalBinlogNum); + + // gen tableIds + tableIds = Lists.newArrayList(); + for (int i = 0; i < tableNum; ++i) { + tableIds.add(baseTableId + i); + } + + new MockUp() { + @Mock + public long getExpiredMs(long direct) { + return direct; + } + }; + } + + @Test + public void testTableTtlGcCommonCase() { + // init base data + long expiredTime = baseNum + expiredBinlogNum; + Map ttlMap = Maps.newHashMap(); + for (int i = 0; i < tableNum; ++i) { + String key = String.format("%d_%d", dbId, baseTableId + i); + if (i <= gcTableNum) { + ttlMap.put(key, expiredTime); + } else { + ttlMap.put(key, 0L); + } + } + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + binlogConfigCache.addDbBinlogConfig(dbId, false, 0L); + + // init & add binlogs + List testBinlogs = Lists.newArrayList(); + Long[] tableLastCommitInfo = new Long[tableNum]; + long maxGcTableId = baseTableId + gcTableNum; + long expiredCommitSeq = -1; + for (int i = 0; i < totalBinlogNum; ++i) { + long tableId = baseTableId + (i / tableNum); + long commitSeq = baseNum + i; + if (tableId <= maxGcTableId) { + expiredCommitSeq = commitSeq; + } + tableLastCommitInfo[i / tableNum] = commitSeq; + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, baseNum); + testBinlogs.add(binlog); + } + + // init DbBinlog + DBBinlog dbBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (dbBinlog == null) { + dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i)); + } + dbBinlog.addBinlog(testBinlogs.get(i)); + } + + // trigger gc + BinlogTombstone tombstone = dbBinlog.gc(); + + // check binlog status + for (TBinlog binlog : testBinlogs) { + if (binlog.getTableIds().get(0) <= baseTableId + gcTableNum) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + + // check dummy binlog + List allBinlogs = Lists.newArrayList(); + dbBinlog.getAllBinlogs(allBinlogs); + for (TBinlog binlog : allBinlogs) { + if (binlog.getType() != TBinlogType.DUMMY) { + break; + } + long belong = binlog.getBelong(); + if (belong < 0) { + Assert.assertEquals(expiredCommitSeq, binlog.getCommitSeq()); + } else if (belong <= maxGcTableId) { + int offset = (int) (belong - baseTableId); + Assert.assertEquals((long) tableLastCommitInfo[offset], binlog.getCommitSeq()); + } else { + Assert.assertEquals(-1, binlog.getCommitSeq()); + } + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(expiredCommitSeq, tombstone.getCommitSeq()); + } + + @Test + public void testTableTtlGcBinlogMultiRefCase() { + // init base data + long expiredTime = baseNum + expiredBinlogNum; + Map ttlMap = Maps.newHashMap(); + for (int i = 0; i < tableNum; ++i) { + String key = String.format("%d_%d", dbId, baseTableId + i); + if (i < tableNum - 1) { + ttlMap.put(key, expiredTime); + } else { + ttlMap.put(key, 0L); + } + } + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + binlogConfigCache.addDbBinlogConfig(dbId, false, 0L); + + // init & add binlogs + List testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + // generate tableIds + long tableId = baseTableId + (i / (tableNum - 1)); + long additionalTableId = (long) (Math.random() * tableNum) + baseTableId; + while (tableId == additionalTableId) { + additionalTableId = (long) (Math.random() * tableNum) + baseTableId; + } + List tableIds = Lists.newArrayList(tableId, additionalTableId); + // init commitSeq + long commitSeq = baseNum + i; + + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableIds, commitSeq, baseNum); + testBinlogs.add(binlog); + } + + // init dbBinlog + DBBinlog dbBinlog = null; + + // ad additional ref & add to dbBinlog + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = testBinlogs.get(i); + if (dbBinlog == null) { + dbBinlog = new DBBinlog(binlogConfigCache, binlog); + } + dbBinlog.addBinlog(binlog); + } + + // trigger gc + dbBinlog.gc(); + + // check binlog status + long unGcTableId = baseTableId + tableNum - 1; + for (TBinlog binlog : testBinlogs) { + if (binlog.getTableIds().contains(unGcTableId)) { + Assert.assertEquals(1, binlog.getTableRef()); + } else { + Assert.assertEquals(0, binlog.getTableRef()); + } + } + } + + @Test + public void testTableCommitSeqGc() { + // init base data + long expiredTime = baseNum + expiredBinlogNum; + Map ttlMap = Maps.newHashMap(); + MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap); + binlogConfigCache.addDbBinlogConfig(dbId, true, expiredTime); + + // init & add binlogs + List testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + // generate tableIds + long tableId = baseTableId + (i / (tableNum - 1)); + long additionalTableId = (long) (Math.random() * tableNum) + baseTableId; + while (tableId == additionalTableId) { + additionalTableId = (long) (Math.random() * tableNum) + baseTableId; + } + List tableIds = Lists.newArrayList(tableId, additionalTableId); + // init stamp + long stamp = baseNum + i; + + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableIds, stamp, stamp); + testBinlogs.add(binlog); + } + + // init dbBinlog + DBBinlog dbBinlog = null; + + // ad additional ref & add to dbBinlog + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = testBinlogs.get(i); + if (dbBinlog == null) { + dbBinlog = new DBBinlog(binlogConfigCache, binlog); + } + dbBinlog.addBinlog(binlog); + } + + // trigger gc + dbBinlog.gc(); + + // check binlog status + for (TBinlog binlog : testBinlogs) { + if (binlog.getTimestamp() <= expiredTime) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertTrue(binlog.getTableRef() != 0); + } + } + } + + @Test + public void testAddBinlog() throws NoSuchFieldException, IllegalAccessException { + // set max value num + int maxValue = 12; + + // mock up + new MockUp() { + @Mock + boolean isEnableDB(long dbId) { + return true; + } + + @Mock + boolean isEnableTable(long dbId, long tableId) { + return true; + } + }; + + // reflect field + Field allBinlogsField = DBBinlog.class.getDeclaredField("allBinlogs"); + allBinlogsField.setAccessible(true); + Field tableBinlogMapField = DBBinlog.class.getDeclaredField("tableBinlogMap"); + tableBinlogMapField.setAccessible(true); + + + for (int i = 0; i <= maxValue; ++i) { + TBinlogType type = TBinlogType.findByValue(i); + if (type == TBinlogType.DUMMY) { + continue; + } + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, baseTableId, 1, 1); + binlog.setType(type); + DBBinlog dbBinlog = new DBBinlog(new BinlogConfigCache(), binlog); + + dbBinlog.addBinlog(binlog); + + TreeSet allbinlogs = (TreeSet) allBinlogsField.get(dbBinlog); + Map tableBinlogMap = (Map) tableBinlogMapField.get(dbBinlog); + Assert.assertTrue(allbinlogs.contains(binlog)); + switch (type) { + case CREATE_TABLE: + case DROP_TABLE: { + Assert.assertTrue(tableBinlogMap.isEmpty()); + break; + } + default: { + Assert.assertTrue(tableBinlogMap.containsKey(baseTableId)); + break; + } + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java new file mode 100644 index 0000000000..4622171e93 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/MockBinlogConfigCache.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.catalog.BinlogConfig; + +import java.util.Map; + +final class MockBinlogConfigCache extends BinlogConfigCache { + private Map mockedConfigs; + + public MockBinlogConfigCache(Map mockedConfigs) { + super(); + this.mockedConfigs = mockedConfigs; + } + + public void addDbBinlogConfig(long dbId, boolean enableBinlog, long expiredTime) { + BinlogConfig config = BinlogTestUtils.newTestBinlogConfig(enableBinlog, expiredTime); + mockedConfigs.put(String.valueOf(dbId), config); + } + + @Override + public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { + return mockedConfigs.get(String.format("%d_%d", dbId, tableId)); + } + + @Override + public BinlogConfig getDBBinlogConfig(long dbId) { + return mockedConfigs.get(String.valueOf(dbId)); + } + + @Override + public boolean isEnableTable(long dbId, long tableId) { + return mockedConfigs.containsKey(String.format("%d_%d", dbId, tableId)); + } + + @Override + public boolean isEnableDB(long dbId) { + BinlogConfig config = mockedConfigs.get(String.valueOf(dbId)); + if (config != null) { + return config.isEnable(); + } + return false; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java new file mode 100644 index 0000000000..b4ecd8a90c --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/binlog/TableBinlogTest.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.binlog; + +import org.apache.doris.thrift.TBinlog; + +import com.google.common.collect.Lists; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class TableBinlogTest { + private long dbId = 10000; + private long tableId = 20000; + + private int totalBinlogNum = 10; + private int expiredBinlogNum = 3; + private long baseNum = 30000L; + + @Before + public void setUp() { + // check args valid + Assert.assertTrue(expiredBinlogNum <= totalBinlogNum); + } + + @Test + public void testTtlGc() { + // mock BinlogUtils + new MockUp() { + @Mock + public long getExpiredMs(long direct) { + return direct; + } + }; + + // init base data + long expiredTime = baseNum + expiredBinlogNum; + BinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(dbId, tableId, expiredTime); + + // init & add binlogs + List testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i); + testBinlogs.add(binlog); + } + + // init TableBinlog + TableBinlog tableBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (tableBinlog == null) { + tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId); + } + tableBinlog.addBinlog(testBinlogs.get(i)); + } + + // trigger ttlGc + BinlogTombstone tombstone = tableBinlog.ttlGc(); + + // check binlog status + for (TBinlog binlog : testBinlogs) { + if (binlog.getTimestamp() <= expiredTime) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(expiredTime, tombstone.getCommitSeq()); + + // check dummy + TBinlog dummy = tableBinlog.getDummyBinlog(); + Assert.assertEquals(expiredTime, dummy.getCommitSeq()); + } + + @Test + public void testCommitSeqGc() { + // init base data + BinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(dbId, tableId, 0); + + // init & add binlogs + List testBinlogs = Lists.newArrayList(); + for (int i = 0; i < totalBinlogNum; ++i) { + TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i); + testBinlogs.add(binlog); + } + + // init TableBinlog + TableBinlog tableBinlog = null; + + // insert binlogs + for (int i = 0; i < totalBinlogNum; ++i) { + if (tableBinlog == null) { + tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId); + } + tableBinlog.addBinlog(testBinlogs.get(i)); + } + + // trigger ttlGc + long expiredCommitSeq = baseNum + expiredBinlogNum; + BinlogTombstone tombstone = tableBinlog.commitSeqGc(expiredCommitSeq); + + // check binlog status + for (TBinlog binlog : testBinlogs) { + if (binlog.getTimestamp() <= expiredCommitSeq) { + Assert.assertEquals(0, binlog.getTableRef()); + } else { + Assert.assertEquals(1, binlog.getTableRef()); + } + } + + // check tombstone + Assert.assertFalse(tombstone.isDbBinlogTomstone()); + Assert.assertEquals(expiredCommitSeq, tombstone.getCommitSeq()); + + // check dummy + TBinlog dummy = tableBinlog.getDummyBinlog(); + Assert.assertEquals(expiredCommitSeq, dummy.getCommitSeq()); + } +}