[UT](binlog) Add BinlogManager unit test #24486
add BinlogManager unit test add DBBinlog unit test add TableBinlog unit test
This commit is contained in:
@ -0,0 +1,382 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.persist.BinlogGcInfo;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.apache.hadoop.util.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class BinlogManagerTest {
|
||||
private Map<Long, List<Long>> frameWork;
|
||||
|
||||
private int dbNum = 2;
|
||||
private int tableNumPerDb = 3;
|
||||
|
||||
private long dbBaseId = 10000;
|
||||
private long tableBaseId = 100;
|
||||
private long baseNum = 10000;
|
||||
private long timeNow = baseNum;
|
||||
private long ttl = 3;
|
||||
|
||||
private boolean enableDbBinlog = false;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
Config.enable_feature_binlog = true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
Assert.assertTrue(tableNumPerDb < 100);
|
||||
frameWork = Maps.newHashMap();
|
||||
for (int dbOff = 1; dbOff <= dbNum; ++dbOff) {
|
||||
long dbId = dbOff * dbBaseId;
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
for (int tblOff = 1; tblOff <= tableNumPerDb; ++tblOff) {
|
||||
tableIds.add(tableBaseId * tblOff + dbId);
|
||||
}
|
||||
frameWork.put(dbId, tableIds);
|
||||
}
|
||||
|
||||
new MockUp<BinlogConfigCache>() {
|
||||
@Mock
|
||||
public BinlogConfig getDBBinlogConfig(long dbId) {
|
||||
return new BinlogConfig();
|
||||
}
|
||||
|
||||
@Mock
|
||||
public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
|
||||
return new BinlogConfig();
|
||||
}
|
||||
|
||||
@Mock
|
||||
public boolean isEnableTable(long dbId, long tableId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public boolean isEnableDB(long dbId) {
|
||||
return enableDbBinlog;
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<BinlogConfig>() {
|
||||
@Mock
|
||||
public long getTtlSeconds() {
|
||||
return ttl;
|
||||
}
|
||||
|
||||
@Mock
|
||||
public boolean isEnable() {
|
||||
return enableDbBinlog;
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<Env>() {
|
||||
@Mock
|
||||
public InternalCatalog getCurrentInternalCatalog() {
|
||||
return new InternalCatalog();
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<InternalCatalog>() {
|
||||
@Mock
|
||||
public Database getDbNullable(long dbId) {
|
||||
return new Database();
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<Database>() {
|
||||
@Mock
|
||||
public BinlogConfig getBinlogConfig() {
|
||||
return new BinlogConfig();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBinlog()
|
||||
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
|
||||
// reflect BinlogManager
|
||||
Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class);
|
||||
addBinlog.setAccessible(true);
|
||||
|
||||
// init binlog manager & addBinlog
|
||||
BinlogManager manager = new BinlogManager();
|
||||
|
||||
// insert table binlogs
|
||||
int binlogNum = 10;
|
||||
for (int i = 1; i <= binlogNum; ++i) {
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbBaseId, tableBaseId, i, i);
|
||||
if (i % 2 == 0) {
|
||||
binlog.setType(TBinlogType.CREATE_TABLE);
|
||||
}
|
||||
addBinlog.invoke(manager, binlog);
|
||||
|
||||
}
|
||||
|
||||
// test get
|
||||
Pair<TStatus, TBinlog> pair;
|
||||
|
||||
// get too old
|
||||
pair = manager.getBinlog(dbBaseId, tableBaseId, -99);
|
||||
Assert.assertEquals(TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ, pair.first.getStatusCode());
|
||||
Assert.assertEquals(TBinlogType.DUMMY, pair.second.getType());
|
||||
|
||||
// get odd commit seq in table level ok
|
||||
pair = manager.getBinlog(dbBaseId, tableBaseId, 5);
|
||||
Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode());
|
||||
Assert.assertEquals(5 + 2, pair.second.getCommitSeq());
|
||||
|
||||
// get even commit seq in table level ok
|
||||
pair = manager.getBinlog(dbBaseId, tableBaseId, 6);
|
||||
Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode());
|
||||
Assert.assertEquals(6 + 1, pair.second.getCommitSeq());
|
||||
|
||||
// get odd commit seq in db level ok
|
||||
pair = manager.getBinlog(dbBaseId, -1, 5);
|
||||
Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode());
|
||||
Assert.assertEquals(5 + 1, pair.second.getCommitSeq());
|
||||
|
||||
// get even commit seq in db level ok
|
||||
pair = manager.getBinlog(dbBaseId, -1, 6);
|
||||
Assert.assertEquals(TStatusCode.OK, pair.first.getStatusCode());
|
||||
Assert.assertEquals(6 + 1, pair.second.getCommitSeq());
|
||||
|
||||
// get too new
|
||||
pair = manager.getBinlog(dbBaseId, tableBaseId, 999);
|
||||
Assert.assertEquals(TStatusCode.BINLOG_TOO_NEW_COMMIT_SEQ, pair.first.getStatusCode());
|
||||
Assert.assertNull(pair.second);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPersist() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException,
|
||||
IOException, NoSuchFieldException {
|
||||
// reflect BinlogManager
|
||||
// addBinlog method
|
||||
Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class);
|
||||
addBinlog.setAccessible(true);
|
||||
// dbBinlogMap
|
||||
Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap");
|
||||
dbBinlogMapField.setAccessible(true);
|
||||
|
||||
// init binlog manager & addBinlog
|
||||
BinlogManager originManager = new BinlogManager();
|
||||
|
||||
// insert binlogs
|
||||
long commitSeq = baseNum;
|
||||
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
|
||||
long dbId = dbEntry.getKey();
|
||||
for (long tableId : dbEntry.getValue()) {
|
||||
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq));
|
||||
++commitSeq;
|
||||
}
|
||||
}
|
||||
|
||||
// init output stream
|
||||
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
|
||||
DataOutputStream outputStream = new DataOutputStream(arrayOutputStream);
|
||||
|
||||
// serialize binlogs
|
||||
originManager.write(outputStream, 0L);
|
||||
|
||||
// init another binlog manager
|
||||
BinlogManager newManager = new BinlogManager();
|
||||
|
||||
// deserialize binlogs
|
||||
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());
|
||||
DataInputStream inputStream = new DataInputStream(arrayInputStream);
|
||||
newManager.read(inputStream, 0L);
|
||||
|
||||
// get origin & new dbbinlog's allbinlogs
|
||||
Map<Long, DBBinlog> originDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(originManager);
|
||||
Map<Long, DBBinlog> newDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(newManager);
|
||||
Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size());
|
||||
for (long dbId : frameWork.keySet()) {
|
||||
List<TBinlog> originBinlogList = Lists.newArrayList();
|
||||
List<TBinlog> newBinlogList = Lists.newArrayList();
|
||||
originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList);
|
||||
newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList);
|
||||
Assert.assertEquals(originBinlogList.size(), newBinlogList.size());
|
||||
for (int i = 0; i < originBinlogList.size(); ++i) {
|
||||
Assert.assertEquals(originBinlogList.get(i).getCommitSeq(),
|
||||
newBinlogList.get(i).getCommitSeq());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayGcFromTableLevel() throws NoSuchMethodException, InvocationTargetException,
|
||||
IllegalAccessException, NoSuchFieldException {
|
||||
// MockUp
|
||||
new MockUp<BinlogUtils>() {
|
||||
@Mock
|
||||
public long getExpiredMs(long ttl) {
|
||||
return timeNow - ttl;
|
||||
}
|
||||
};
|
||||
|
||||
// reflect BinlogManager
|
||||
// addBinlog method
|
||||
Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class);
|
||||
addBinlog.setAccessible(true);
|
||||
// dbBinlogMap
|
||||
Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap");
|
||||
dbBinlogMapField.setAccessible(true);
|
||||
|
||||
// init binlog origin & new manager
|
||||
BinlogManager originManager = new BinlogManager();
|
||||
BinlogManager newManager = new BinlogManager();
|
||||
|
||||
// insert binlogs
|
||||
long commitSeq = 0;
|
||||
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
|
||||
long dbId = dbEntry.getKey();
|
||||
for (long tableId : dbEntry.getValue()) {
|
||||
if ((tableId / tableBaseId) % 2 != 0) {
|
||||
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
|
||||
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, timeNow));
|
||||
++commitSeq;
|
||||
} else {
|
||||
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
|
||||
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, 0, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// origin manager gc & get BinlogGcInfo
|
||||
BinlogGcInfo info = new BinlogGcInfo(originManager.gc());
|
||||
|
||||
// new manager replay gc
|
||||
newManager.replayGc(info);
|
||||
|
||||
// get origin & new dbbinlog's allbinlogs
|
||||
Map<Long, DBBinlog> originDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(originManager);
|
||||
Map<Long, DBBinlog> newDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(newManager);
|
||||
Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size());
|
||||
for (long dbId : frameWork.keySet()) {
|
||||
List<TBinlog> originBinlogList = Lists.newArrayList();
|
||||
List<TBinlog> newBinlogList = Lists.newArrayList();
|
||||
originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList);
|
||||
newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList);
|
||||
Assert.assertEquals(originBinlogList.size(), newBinlogList.size());
|
||||
for (int i = 0; i < originBinlogList.size(); ++i) {
|
||||
TBinlog originBinlog = originBinlogList.get(i);
|
||||
TBinlog newBinlog = newBinlogList.get(i);
|
||||
Assert.assertEquals(originBinlog.getCommitSeq(), newBinlog.getCommitSeq());
|
||||
if (newBinlog.getType() != TBinlogType.DUMMY) {
|
||||
Assert.assertTrue(newBinlog.getTimestamp() > timeNow - ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplayGcFromDbLevel() throws NoSuchMethodException, InvocationTargetException,
|
||||
IllegalAccessException, NoSuchFieldException {
|
||||
// MockUp
|
||||
new MockUp<BinlogUtils>() {
|
||||
@Mock
|
||||
public long getExpiredMs(long ttl) {
|
||||
return timeNow - ttl;
|
||||
}
|
||||
};
|
||||
|
||||
// set dbBinlogEnable
|
||||
enableDbBinlog = true;
|
||||
|
||||
// reflect BinlogManager
|
||||
// addBinlog method
|
||||
Method addBinlog = BinlogManager.class.getDeclaredMethod("addBinlog", TBinlog.class);
|
||||
addBinlog.setAccessible(true);
|
||||
// dbBinlogMap
|
||||
Field dbBinlogMapField = BinlogManager.class.getDeclaredField("dbBinlogMap");
|
||||
dbBinlogMapField.setAccessible(true);
|
||||
|
||||
// init binlog origin & new manager
|
||||
BinlogManager originManager = new BinlogManager();
|
||||
BinlogManager newManager = new BinlogManager();
|
||||
|
||||
// insert binlogs
|
||||
long commitSeq = baseNum;
|
||||
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
|
||||
long dbId = dbEntry.getKey();
|
||||
for (long tableId : dbEntry.getValue()) {
|
||||
++commitSeq;
|
||||
addBinlog.invoke(originManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq));
|
||||
addBinlog.invoke(newManager, BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, commitSeq));
|
||||
}
|
||||
}
|
||||
timeNow = commitSeq;
|
||||
|
||||
// origin manager gc & get BinlogGcInfo
|
||||
BinlogGcInfo info = new BinlogGcInfo(originManager.gc());
|
||||
|
||||
// new manager replay gc
|
||||
newManager.replayGc(info);
|
||||
|
||||
// get origin & new dbbinlog's allbinlogs
|
||||
Map<Long, DBBinlog> originDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(originManager);
|
||||
Map<Long, DBBinlog> newDbBinlogMap = (Map<Long, DBBinlog>) dbBinlogMapField.get(newManager);
|
||||
Assert.assertEquals(originDbBinlogMap.size(), newDbBinlogMap.size());
|
||||
for (Map.Entry<Long, List<Long>> dbEntry : frameWork.entrySet()) {
|
||||
long dbId = dbEntry.getKey();
|
||||
List<TBinlog> originBinlogList = Lists.newArrayList();
|
||||
List<TBinlog> newBinlogList = Lists.newArrayList();
|
||||
originDbBinlogMap.get(dbId).getAllBinlogs(originBinlogList);
|
||||
newDbBinlogMap.get(dbId).getAllBinlogs(newBinlogList);
|
||||
Assert.assertEquals(originBinlogList.size(), newBinlogList.size());
|
||||
for (int i = 0; i < originBinlogList.size(); ++i) {
|
||||
TBinlog originBinlog = originBinlogList.get(i);
|
||||
TBinlog newBinlog = newBinlogList.get(i);
|
||||
Assert.assertEquals(originBinlog.getCommitSeq(), newBinlog.getCommitSeq());
|
||||
if (newBinlog.getType() != TBinlogType.DUMMY) {
|
||||
Assert.assertTrue(newBinlog.getCommitSeq() > timeNow - ttl);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,76 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class BinlogTestUtils {
|
||||
|
||||
public static final long MAX_BYTES = 0x7fffffffffffffffL;
|
||||
public static final long MAX_HISTORY_NUMS = 0x7fffffffffffffffL;
|
||||
|
||||
public static BinlogConfig newTestBinlogConfig(boolean enableBinlog, long expiredTime) {
|
||||
return new BinlogConfig(enableBinlog, expiredTime, MAX_BYTES, MAX_HISTORY_NUMS);
|
||||
}
|
||||
|
||||
public static BinlogConfigCache newMockBinlogConfigCache(long dbId, long tableId, long expiredTime) {
|
||||
BinlogConfig binlogConfig = newTestBinlogConfig(true, expiredTime);
|
||||
return new MockBinlogConfigCache(
|
||||
Collections.singletonMap(String.format("%d_%d", dbId, tableId), binlogConfig));
|
||||
}
|
||||
|
||||
public static MockBinlogConfigCache newMockBinlogConfigCache(Map<String, Long> ttlMap) {
|
||||
Map<String, BinlogConfig> configMap = Maps.newHashMap();
|
||||
for (Map.Entry<String, Long> entry : ttlMap.entrySet()) {
|
||||
configMap.put(entry.getKey(), newTestBinlogConfig(true, entry.getValue()));
|
||||
}
|
||||
return new MockBinlogConfigCache(configMap);
|
||||
}
|
||||
|
||||
public static TBinlog newBinlog(long dbId, long tableId, long commitSeq, long timestamp) {
|
||||
TBinlog binlog = new TBinlog();
|
||||
binlog.setDbId(dbId);
|
||||
binlog.setTableIds(Collections.singletonList(tableId));
|
||||
binlog.setType(TBinlogType.ALTER_JOB);
|
||||
binlog.setCommitSeq(commitSeq);
|
||||
binlog.setTimestamp(timestamp);
|
||||
binlog.setTableRef(0);
|
||||
binlog.setBelong(-1);
|
||||
return binlog;
|
||||
}
|
||||
|
||||
public static TBinlog newBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp) {
|
||||
TBinlog binlog = new TBinlog();
|
||||
binlog.setDbId(dbId);
|
||||
binlog.setTableIds(tableIds);
|
||||
binlog.setType(TBinlogType.ALTER_JOB);
|
||||
binlog.setCommitSeq(commitSeq);
|
||||
binlog.setTimestamp(timestamp);
|
||||
binlog.setTableRef(0);
|
||||
binlog.setBelong(-1);
|
||||
return binlog;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,307 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class DbBinlogTest {
|
||||
private long dbId = 10000L;
|
||||
private long baseTableId = 20000L;
|
||||
private int tableNum = 5;
|
||||
private int gcTableNum = 2;
|
||||
private List<Long> tableIds;
|
||||
|
||||
private int totalBinlogNum = 10;
|
||||
private int expiredBinlogNum = 3;
|
||||
private long baseNum = 30000L;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
// check args valid
|
||||
Assert.assertTrue(totalBinlogNum > 0);
|
||||
Assert.assertTrue(gcTableNum <= tableNum);
|
||||
Assert.assertTrue(expiredBinlogNum <= totalBinlogNum);
|
||||
|
||||
// gen tableIds
|
||||
tableIds = Lists.newArrayList();
|
||||
for (int i = 0; i < tableNum; ++i) {
|
||||
tableIds.add(baseTableId + i);
|
||||
}
|
||||
|
||||
new MockUp<BinlogUtils>() {
|
||||
@Mock
|
||||
public long getExpiredMs(long direct) {
|
||||
return direct;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableTtlGcCommonCase() {
|
||||
// init base data
|
||||
long expiredTime = baseNum + expiredBinlogNum;
|
||||
Map<String, Long> ttlMap = Maps.newHashMap();
|
||||
for (int i = 0; i < tableNum; ++i) {
|
||||
String key = String.format("%d_%d", dbId, baseTableId + i);
|
||||
if (i <= gcTableNum) {
|
||||
ttlMap.put(key, expiredTime);
|
||||
} else {
|
||||
ttlMap.put(key, 0L);
|
||||
}
|
||||
}
|
||||
MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
|
||||
binlogConfigCache.addDbBinlogConfig(dbId, false, 0L);
|
||||
|
||||
// init & add binlogs
|
||||
List<TBinlog> testBinlogs = Lists.newArrayList();
|
||||
Long[] tableLastCommitInfo = new Long[tableNum];
|
||||
long maxGcTableId = baseTableId + gcTableNum;
|
||||
long expiredCommitSeq = -1;
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
long tableId = baseTableId + (i / tableNum);
|
||||
long commitSeq = baseNum + i;
|
||||
if (tableId <= maxGcTableId) {
|
||||
expiredCommitSeq = commitSeq;
|
||||
}
|
||||
tableLastCommitInfo[i / tableNum] = commitSeq;
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, commitSeq, baseNum);
|
||||
testBinlogs.add(binlog);
|
||||
}
|
||||
|
||||
// init DbBinlog
|
||||
DBBinlog dbBinlog = null;
|
||||
|
||||
// insert binlogs
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
if (dbBinlog == null) {
|
||||
dbBinlog = new DBBinlog(binlogConfigCache, testBinlogs.get(i));
|
||||
}
|
||||
dbBinlog.addBinlog(testBinlogs.get(i));
|
||||
}
|
||||
|
||||
// trigger gc
|
||||
BinlogTombstone tombstone = dbBinlog.gc();
|
||||
|
||||
// check binlog status
|
||||
for (TBinlog binlog : testBinlogs) {
|
||||
if (binlog.getTableIds().get(0) <= baseTableId + gcTableNum) {
|
||||
Assert.assertEquals(0, binlog.getTableRef());
|
||||
} else {
|
||||
Assert.assertEquals(1, binlog.getTableRef());
|
||||
}
|
||||
}
|
||||
|
||||
// check dummy binlog
|
||||
List<TBinlog> allBinlogs = Lists.newArrayList();
|
||||
dbBinlog.getAllBinlogs(allBinlogs);
|
||||
for (TBinlog binlog : allBinlogs) {
|
||||
if (binlog.getType() != TBinlogType.DUMMY) {
|
||||
break;
|
||||
}
|
||||
long belong = binlog.getBelong();
|
||||
if (belong < 0) {
|
||||
Assert.assertEquals(expiredCommitSeq, binlog.getCommitSeq());
|
||||
} else if (belong <= maxGcTableId) {
|
||||
int offset = (int) (belong - baseTableId);
|
||||
Assert.assertEquals((long) tableLastCommitInfo[offset], binlog.getCommitSeq());
|
||||
} else {
|
||||
Assert.assertEquals(-1, binlog.getCommitSeq());
|
||||
}
|
||||
}
|
||||
|
||||
// check tombstone
|
||||
Assert.assertFalse(tombstone.isDbBinlogTomstone());
|
||||
Assert.assertEquals(expiredCommitSeq, tombstone.getCommitSeq());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableTtlGcBinlogMultiRefCase() {
|
||||
// init base data
|
||||
long expiredTime = baseNum + expiredBinlogNum;
|
||||
Map<String, Long> ttlMap = Maps.newHashMap();
|
||||
for (int i = 0; i < tableNum; ++i) {
|
||||
String key = String.format("%d_%d", dbId, baseTableId + i);
|
||||
if (i < tableNum - 1) {
|
||||
ttlMap.put(key, expiredTime);
|
||||
} else {
|
||||
ttlMap.put(key, 0L);
|
||||
}
|
||||
}
|
||||
MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
|
||||
binlogConfigCache.addDbBinlogConfig(dbId, false, 0L);
|
||||
|
||||
// init & add binlogs
|
||||
List<TBinlog> testBinlogs = Lists.newArrayList();
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
// generate tableIds
|
||||
long tableId = baseTableId + (i / (tableNum - 1));
|
||||
long additionalTableId = (long) (Math.random() * tableNum) + baseTableId;
|
||||
while (tableId == additionalTableId) {
|
||||
additionalTableId = (long) (Math.random() * tableNum) + baseTableId;
|
||||
}
|
||||
List<Long> tableIds = Lists.newArrayList(tableId, additionalTableId);
|
||||
// init commitSeq
|
||||
long commitSeq = baseNum + i;
|
||||
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableIds, commitSeq, baseNum);
|
||||
testBinlogs.add(binlog);
|
||||
}
|
||||
|
||||
// init dbBinlog
|
||||
DBBinlog dbBinlog = null;
|
||||
|
||||
// ad additional ref & add to dbBinlog
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
TBinlog binlog = testBinlogs.get(i);
|
||||
if (dbBinlog == null) {
|
||||
dbBinlog = new DBBinlog(binlogConfigCache, binlog);
|
||||
}
|
||||
dbBinlog.addBinlog(binlog);
|
||||
}
|
||||
|
||||
// trigger gc
|
||||
dbBinlog.gc();
|
||||
|
||||
// check binlog status
|
||||
long unGcTableId = baseTableId + tableNum - 1;
|
||||
for (TBinlog binlog : testBinlogs) {
|
||||
if (binlog.getTableIds().contains(unGcTableId)) {
|
||||
Assert.assertEquals(1, binlog.getTableRef());
|
||||
} else {
|
||||
Assert.assertEquals(0, binlog.getTableRef());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableCommitSeqGc() {
|
||||
// init base data
|
||||
long expiredTime = baseNum + expiredBinlogNum;
|
||||
Map<String, Long> ttlMap = Maps.newHashMap();
|
||||
MockBinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(ttlMap);
|
||||
binlogConfigCache.addDbBinlogConfig(dbId, true, expiredTime);
|
||||
|
||||
// init & add binlogs
|
||||
List<TBinlog> testBinlogs = Lists.newArrayList();
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
// generate tableIds
|
||||
long tableId = baseTableId + (i / (tableNum - 1));
|
||||
long additionalTableId = (long) (Math.random() * tableNum) + baseTableId;
|
||||
while (tableId == additionalTableId) {
|
||||
additionalTableId = (long) (Math.random() * tableNum) + baseTableId;
|
||||
}
|
||||
List<Long> tableIds = Lists.newArrayList(tableId, additionalTableId);
|
||||
// init stamp
|
||||
long stamp = baseNum + i;
|
||||
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableIds, stamp, stamp);
|
||||
testBinlogs.add(binlog);
|
||||
}
|
||||
|
||||
// init dbBinlog
|
||||
DBBinlog dbBinlog = null;
|
||||
|
||||
// ad additional ref & add to dbBinlog
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
TBinlog binlog = testBinlogs.get(i);
|
||||
if (dbBinlog == null) {
|
||||
dbBinlog = new DBBinlog(binlogConfigCache, binlog);
|
||||
}
|
||||
dbBinlog.addBinlog(binlog);
|
||||
}
|
||||
|
||||
// trigger gc
|
||||
dbBinlog.gc();
|
||||
|
||||
// check binlog status
|
||||
for (TBinlog binlog : testBinlogs) {
|
||||
if (binlog.getTimestamp() <= expiredTime) {
|
||||
Assert.assertEquals(0, binlog.getTableRef());
|
||||
} else {
|
||||
Assert.assertTrue(binlog.getTableRef() != 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddBinlog() throws NoSuchFieldException, IllegalAccessException {
|
||||
// set max value num
|
||||
int maxValue = 12;
|
||||
|
||||
// mock up
|
||||
new MockUp<BinlogConfigCache>() {
|
||||
@Mock
|
||||
boolean isEnableDB(long dbId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Mock
|
||||
boolean isEnableTable(long dbId, long tableId) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
// reflect field
|
||||
Field allBinlogsField = DBBinlog.class.getDeclaredField("allBinlogs");
|
||||
allBinlogsField.setAccessible(true);
|
||||
Field tableBinlogMapField = DBBinlog.class.getDeclaredField("tableBinlogMap");
|
||||
tableBinlogMapField.setAccessible(true);
|
||||
|
||||
|
||||
for (int i = 0; i <= maxValue; ++i) {
|
||||
TBinlogType type = TBinlogType.findByValue(i);
|
||||
if (type == TBinlogType.DUMMY) {
|
||||
continue;
|
||||
}
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, baseTableId, 1, 1);
|
||||
binlog.setType(type);
|
||||
DBBinlog dbBinlog = new DBBinlog(new BinlogConfigCache(), binlog);
|
||||
|
||||
dbBinlog.addBinlog(binlog);
|
||||
|
||||
TreeSet<TBinlog> allbinlogs = (TreeSet<TBinlog>) allBinlogsField.get(dbBinlog);
|
||||
Map<Long, TableBinlog> tableBinlogMap = (Map<Long, TableBinlog>) tableBinlogMapField.get(dbBinlog);
|
||||
Assert.assertTrue(allbinlogs.contains(binlog));
|
||||
switch (type) {
|
||||
case CREATE_TABLE:
|
||||
case DROP_TABLE: {
|
||||
Assert.assertTrue(tableBinlogMap.isEmpty());
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
Assert.assertTrue(tableBinlogMap.containsKey(baseTableId));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,60 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
final class MockBinlogConfigCache extends BinlogConfigCache {
|
||||
private Map<String, BinlogConfig> mockedConfigs;
|
||||
|
||||
public MockBinlogConfigCache(Map<String, BinlogConfig> mockedConfigs) {
|
||||
super();
|
||||
this.mockedConfigs = mockedConfigs;
|
||||
}
|
||||
|
||||
public void addDbBinlogConfig(long dbId, boolean enableBinlog, long expiredTime) {
|
||||
BinlogConfig config = BinlogTestUtils.newTestBinlogConfig(enableBinlog, expiredTime);
|
||||
mockedConfigs.put(String.valueOf(dbId), config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
|
||||
return mockedConfigs.get(String.format("%d_%d", dbId, tableId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public BinlogConfig getDBBinlogConfig(long dbId) {
|
||||
return mockedConfigs.get(String.valueOf(dbId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnableTable(long dbId, long tableId) {
|
||||
return mockedConfigs.containsKey(String.format("%d_%d", dbId, tableId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnableDB(long dbId) {
|
||||
BinlogConfig config = mockedConfigs.get(String.valueOf(dbId));
|
||||
if (config != null) {
|
||||
return config.isEnable();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,142 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TableBinlogTest {
|
||||
private long dbId = 10000;
|
||||
private long tableId = 20000;
|
||||
|
||||
private int totalBinlogNum = 10;
|
||||
private int expiredBinlogNum = 3;
|
||||
private long baseNum = 30000L;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
// check args valid
|
||||
Assert.assertTrue(expiredBinlogNum <= totalBinlogNum);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTtlGc() {
|
||||
// mock BinlogUtils
|
||||
new MockUp<BinlogUtils>() {
|
||||
@Mock
|
||||
public long getExpiredMs(long direct) {
|
||||
return direct;
|
||||
}
|
||||
};
|
||||
|
||||
// init base data
|
||||
long expiredTime = baseNum + expiredBinlogNum;
|
||||
BinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(dbId, tableId, expiredTime);
|
||||
|
||||
// init & add binlogs
|
||||
List<TBinlog> testBinlogs = Lists.newArrayList();
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i);
|
||||
testBinlogs.add(binlog);
|
||||
}
|
||||
|
||||
// init TableBinlog
|
||||
TableBinlog tableBinlog = null;
|
||||
|
||||
// insert binlogs
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
if (tableBinlog == null) {
|
||||
tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId);
|
||||
}
|
||||
tableBinlog.addBinlog(testBinlogs.get(i));
|
||||
}
|
||||
|
||||
// trigger ttlGc
|
||||
BinlogTombstone tombstone = tableBinlog.ttlGc();
|
||||
|
||||
// check binlog status
|
||||
for (TBinlog binlog : testBinlogs) {
|
||||
if (binlog.getTimestamp() <= expiredTime) {
|
||||
Assert.assertEquals(0, binlog.getTableRef());
|
||||
} else {
|
||||
Assert.assertEquals(1, binlog.getTableRef());
|
||||
}
|
||||
}
|
||||
|
||||
// check tombstone
|
||||
Assert.assertFalse(tombstone.isDbBinlogTomstone());
|
||||
Assert.assertEquals(expiredTime, tombstone.getCommitSeq());
|
||||
|
||||
// check dummy
|
||||
TBinlog dummy = tableBinlog.getDummyBinlog();
|
||||
Assert.assertEquals(expiredTime, dummy.getCommitSeq());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCommitSeqGc() {
|
||||
// init base data
|
||||
BinlogConfigCache binlogConfigCache = BinlogTestUtils.newMockBinlogConfigCache(dbId, tableId, 0);
|
||||
|
||||
// init & add binlogs
|
||||
List<TBinlog> testBinlogs = Lists.newArrayList();
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
TBinlog binlog = BinlogTestUtils.newBinlog(dbId, tableId, baseNum + i, baseNum + i);
|
||||
testBinlogs.add(binlog);
|
||||
}
|
||||
|
||||
// init TableBinlog
|
||||
TableBinlog tableBinlog = null;
|
||||
|
||||
// insert binlogs
|
||||
for (int i = 0; i < totalBinlogNum; ++i) {
|
||||
if (tableBinlog == null) {
|
||||
tableBinlog = new TableBinlog(binlogConfigCache, testBinlogs.get(i), dbId, tableId);
|
||||
}
|
||||
tableBinlog.addBinlog(testBinlogs.get(i));
|
||||
}
|
||||
|
||||
// trigger ttlGc
|
||||
long expiredCommitSeq = baseNum + expiredBinlogNum;
|
||||
BinlogTombstone tombstone = tableBinlog.commitSeqGc(expiredCommitSeq);
|
||||
|
||||
// check binlog status
|
||||
for (TBinlog binlog : testBinlogs) {
|
||||
if (binlog.getTimestamp() <= expiredCommitSeq) {
|
||||
Assert.assertEquals(0, binlog.getTableRef());
|
||||
} else {
|
||||
Assert.assertEquals(1, binlog.getTableRef());
|
||||
}
|
||||
}
|
||||
|
||||
// check tombstone
|
||||
Assert.assertFalse(tombstone.isDbBinlogTomstone());
|
||||
Assert.assertEquals(expiredCommitSeq, tombstone.getCommitSeq());
|
||||
|
||||
// check dummy
|
||||
TBinlog dummy = tableBinlog.getDummyBinlog();
|
||||
Assert.assertEquals(expiredCommitSeq, dummy.getCommitSeq());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user