[Enhancement](binlog) Add binlog enable diable check in BinlogManager (#22173)
Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
@ -0,0 +1,146 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class BinlogConfigCache {
|
||||
private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class);
|
||||
|
||||
private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all use id
|
||||
private ReentrantReadWriteLock lock;
|
||||
|
||||
public BinlogConfigCache() {
|
||||
dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
|
||||
lock = new ReentrantReadWriteLock();
|
||||
}
|
||||
|
||||
public BinlogConfig getDBBinlogConfig(long dbId) {
|
||||
lock.readLock().lock();
|
||||
BinlogConfig binlogConfig = dbTableBinlogEnableMap.get(dbId);
|
||||
lock.readLock().unlock();
|
||||
if (binlogConfig != null) {
|
||||
return binlogConfig;
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
LOG.warn("db not found. dbId: {}", dbId);
|
||||
return null;
|
||||
}
|
||||
|
||||
binlogConfig = db.getBinlogConfig();
|
||||
dbTableBinlogEnableMap.put(dbId, binlogConfig);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
return binlogConfig;
|
||||
}
|
||||
|
||||
public boolean isEnableDB(long dbId) {
|
||||
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
|
||||
if (dBinlogConfig == null) {
|
||||
return false;
|
||||
}
|
||||
return dBinlogConfig.isEnable();
|
||||
}
|
||||
|
||||
public long getDBTtlSeconds(long dbId) {
|
||||
BinlogConfig dBinlogConfig = getDBBinlogConfig(dbId);
|
||||
if (dBinlogConfig == null) {
|
||||
return BinlogConfig.TTL_SECONDS;
|
||||
}
|
||||
return dBinlogConfig.getTtlSeconds();
|
||||
}
|
||||
|
||||
public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
|
||||
lock.readLock().lock();
|
||||
BinlogConfig tableBinlogConfig = dbTableBinlogEnableMap.get(tableId);
|
||||
lock.readLock().unlock();
|
||||
if (tableBinlogConfig != null) {
|
||||
return tableBinlogConfig;
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
LOG.warn("db not found. dbId: {}", dbId);
|
||||
return null;
|
||||
}
|
||||
|
||||
Table table = db.getTableOrMetaException(tableId);
|
||||
if (table == null) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return null;
|
||||
}
|
||||
if (!(table instanceof OlapTable)) {
|
||||
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return null;
|
||||
}
|
||||
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
tableBinlogConfig = olapTable.getBinlogConfig();
|
||||
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
|
||||
return tableBinlogConfig;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
|
||||
return null;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isEnableTable(long dbId, long tableId) {
|
||||
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
|
||||
if (tableBinlogConfig == null) {
|
||||
return false;
|
||||
}
|
||||
return tableBinlogConfig.isEnable();
|
||||
}
|
||||
|
||||
public long getTableTtlSeconds(long dbId, long tableId) {
|
||||
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
|
||||
if (tableBinlogConfig == null) {
|
||||
return BinlogConfig.TTL_SECONDS;
|
||||
}
|
||||
return tableBinlogConfig.getTtlSeconds();
|
||||
}
|
||||
|
||||
public void remove(long id) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
dbTableBinlogEnableMap.remove(id);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -41,7 +41,7 @@ import java.util.Map;
|
||||
|
||||
public class BinlogGcer extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(BinlogGcer.class);
|
||||
private static final long GC_DURATION_MS = 313 * 1000L; // 313s
|
||||
private static final long GC_DURATION_MS = 15 * 1000L; // 15s
|
||||
|
||||
// TODO(Drogon): use this to control gc frequency by real gc time waste sample
|
||||
private long lastGcTime = 0L;
|
||||
|
||||
@ -22,8 +22,10 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
import org.apache.doris.persist.BinlogGcInfo;
|
||||
import org.apache.doris.persist.DropPartitionInfo;
|
||||
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
|
||||
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
@ -53,10 +55,33 @@ public class BinlogManager {
|
||||
|
||||
private ReentrantReadWriteLock lock;
|
||||
private Map<Long, DBBinlog> dbBinlogMap;
|
||||
private BinlogConfigCache binlogConfigCache;
|
||||
|
||||
public BinlogManager() {
|
||||
lock = new ReentrantReadWriteLock();
|
||||
dbBinlogMap = Maps.newHashMap();
|
||||
binlogConfigCache = new BinlogConfigCache();
|
||||
}
|
||||
|
||||
private void afterAddBinlog(TBinlog binlog) {
|
||||
if (!binlog.isSetRemoveEnableCache()) {
|
||||
return;
|
||||
}
|
||||
if (!binlog.isRemoveEnableCache()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long dbId = binlog.getDbId();
|
||||
boolean onlyDb = true;
|
||||
if (binlog.isSetTableIds()) {
|
||||
for (long tableId : binlog.getTableIds()) {
|
||||
binlogConfigCache.remove(tableId);
|
||||
onlyDb = false;
|
||||
}
|
||||
}
|
||||
if (onlyDb) {
|
||||
binlogConfigCache.remove(dbId);
|
||||
}
|
||||
}
|
||||
|
||||
private void addBinlog(TBinlog binlog) {
|
||||
@ -64,32 +89,29 @@ public class BinlogManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// find db BinlogConfig
|
||||
long dbId = binlog.getDbId();
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
LOG.warn("db not found. dbId: {}", dbId);
|
||||
return;
|
||||
}
|
||||
boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
|
||||
|
||||
DBBinlog dbBinlog;
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
long dbId = binlog.getDbId();
|
||||
dbBinlog = dbBinlogMap.get(dbId);
|
||||
|
||||
if (dbBinlog == null) {
|
||||
dbBinlog = new DBBinlog(binlog);
|
||||
dbBinlog = new DBBinlog(binlogConfigCache, binlog);
|
||||
dbBinlogMap.put(dbId, dbBinlog);
|
||||
}
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
dbBinlog.addBinlog(binlog, dbBinlogEnable);
|
||||
dbBinlog.addBinlog(binlog);
|
||||
}
|
||||
|
||||
private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
|
||||
String data) {
|
||||
String data, boolean removeEnableCache) {
|
||||
if (!Config.enable_feature_binlog) {
|
||||
return;
|
||||
}
|
||||
|
||||
TBinlog binlog = new TBinlog();
|
||||
// set commitSeq, timestamp, type, dbId, data
|
||||
binlog.setCommitSeq(commitSeq);
|
||||
@ -101,7 +123,26 @@ public class BinlogManager {
|
||||
binlog.setTableIds(tableIds);
|
||||
}
|
||||
binlog.setTableRef(0);
|
||||
addBinlog(binlog);
|
||||
binlog.setRemoveEnableCache(removeEnableCache);
|
||||
|
||||
// Check if all db or table binlog is disable, return
|
||||
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
|
||||
boolean anyEnable = dbBinlogEnable;
|
||||
if (tableIds != null) {
|
||||
for (long tableId : tableIds) {
|
||||
boolean tableBinlogEnable = binlogConfigCache.isEnableTable(dbId, tableId);
|
||||
anyEnable = anyEnable || tableBinlogEnable;
|
||||
if (anyEnable) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (anyEnable) {
|
||||
addBinlog(binlog);
|
||||
}
|
||||
|
||||
afterAddBinlog(binlog);
|
||||
}
|
||||
|
||||
public void addUpsertRecord(UpsertRecord upsertRecord) {
|
||||
@ -112,7 +153,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.UPSERT;
|
||||
String data = upsertRecord.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
|
||||
@ -124,7 +165,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.ADD_PARTITION;
|
||||
String data = addPartitionRecord.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addCreateTableRecord(CreateTableRecord createTableRecord) {
|
||||
@ -136,7 +177,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.CREATE_TABLE;
|
||||
String data = createTableRecord.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
|
||||
@ -147,7 +188,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.DROP_PARTITION;
|
||||
String data = dropPartitionInfo.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addDropTableRecord(DropTableRecord record) {
|
||||
@ -159,7 +200,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.DROP_TABLE;
|
||||
String data = record.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
|
||||
@ -170,7 +211,7 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.ALTER_JOB;
|
||||
String data = alterJob.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
|
||||
@ -181,7 +222,29 @@ public class BinlogManager {
|
||||
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
|
||||
}
|
||||
|
||||
public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) {
|
||||
long dbId = info.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
|
||||
}
|
||||
|
||||
public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) {
|
||||
long dbId = info.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
tableIds.add(info.getTableId());
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
|
||||
}
|
||||
|
||||
// get binlog by dbId, return first binlog.version > version
|
||||
@ -383,7 +446,8 @@ public class BinlogManager {
|
||||
if (binlog.getType() == TBinlogType.DUMMY) {
|
||||
// collect tableDummyBinlogs and dbDummyBinlog to recover DBBinlog and TableBinlog
|
||||
if (binlog.getBelong() == -1) {
|
||||
DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlog, tableDummies, currentDbBinlogEnable);
|
||||
DBBinlog dbBinlog = DBBinlog.recoverDbBinlog(binlogConfigCache, binlog, tableDummies,
|
||||
currentDbBinlogEnable);
|
||||
dbBinlogMap.put(dbId, dbBinlog);
|
||||
} else {
|
||||
tableDummies.add(binlog);
|
||||
|
||||
@ -92,7 +92,7 @@ public class BinlogTombstone {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
// TODO(deadlinefen): delete this code later
|
||||
// TODO(deadlinefen): deprecated this code later
|
||||
public List<Long> getTableIds() {
|
||||
if (tableIds == null) {
|
||||
tableIds = Collections.emptyList();
|
||||
@ -102,7 +102,7 @@ public class BinlogTombstone {
|
||||
|
||||
public Map<Long, Long> getTableCommitSeqMap() {
|
||||
if (tableCommitSeqMap == null) {
|
||||
tableCommitSeqMap = Collections.emptyMap();
|
||||
tableCommitSeqMap = Maps.newHashMap();
|
||||
}
|
||||
return tableCommitSeqMap;
|
||||
}
|
||||
|
||||
@ -17,24 +17,15 @@
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class BinlogUtils {
|
||||
private static final Logger LOG = LogManager.getLogger(BinlogUtils.class);
|
||||
|
||||
public static Pair<TStatus, TBinlog> getBinlog(TreeSet<TBinlog> binlogs, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
TBinlog firstBinlog = binlogs.first();
|
||||
@ -90,33 +81,6 @@ public class BinlogUtils {
|
||||
return dummy;
|
||||
}
|
||||
|
||||
public static boolean tableEnabledBinlog(long dbId, long tableId) {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
LOG.error("db not found. dbId: {}", dbId);
|
||||
return false;
|
||||
}
|
||||
|
||||
OlapTable table;
|
||||
try {
|
||||
Table tbl = db.getTableOrMetaException(tableId);
|
||||
if (tbl == null) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return false;
|
||||
}
|
||||
if (!(tbl instanceof OlapTable)) {
|
||||
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return false;
|
||||
}
|
||||
table = (OlapTable) tbl;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return false;
|
||||
}
|
||||
|
||||
return table.getBinlogConfig().isEnable();
|
||||
}
|
||||
|
||||
public static long getExpiredMs(long ttlSeconds) {
|
||||
long currentSeconds = System.currentTimeMillis() / 1000;
|
||||
if (currentSeconds < ttlSeconds) {
|
||||
|
||||
@ -17,8 +17,7 @@
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
@ -55,9 +54,12 @@ public class DBBinlog {
|
||||
|
||||
private List<TBinlog> tableDummyBinlogs;
|
||||
|
||||
public DBBinlog(TBinlog binlog) {
|
||||
private BinlogConfigCache binlogConfigCache;
|
||||
|
||||
public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
|
||||
lock = new ReentrantReadWriteLock();
|
||||
this.dbId = binlog.getDbId();
|
||||
this.binlogConfigCache = binlogConfigCache;
|
||||
|
||||
// allBinlogs treeset order by commitSeq
|
||||
allBinlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
|
||||
@ -74,14 +76,16 @@ public class DBBinlog {
|
||||
allBinlogs.add(dummy);
|
||||
}
|
||||
|
||||
public static DBBinlog recoverDbBinlog(TBinlog dbDummy, List<TBinlog> tableDummies, boolean dbBinlogEnable) {
|
||||
DBBinlog dbBinlog = new DBBinlog(dbDummy);
|
||||
public static DBBinlog recoverDbBinlog(BinlogConfigCache binlogConfigCache, TBinlog dbDummy,
|
||||
List<TBinlog> tableDummies, boolean dbBinlogEnable) {
|
||||
DBBinlog dbBinlog = new DBBinlog(binlogConfigCache, dbDummy);
|
||||
long dbId = dbDummy.getDbId();
|
||||
for (TBinlog tableDummy : tableDummies) {
|
||||
long tableId = tableDummy.getBelong();
|
||||
if (!dbBinlogEnable && !BinlogUtils.tableEnabledBinlog(dbBinlog.getDbId(), tableId)) {
|
||||
if (!dbBinlogEnable && !binlogConfigCache.isEnableTable(dbId, tableId)) {
|
||||
continue;
|
||||
}
|
||||
dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(tableDummy, tableId));
|
||||
dbBinlog.tableBinlogMap.put(tableId, new TableBinlog(binlogConfigCache, tableDummy, dbId, tableId));
|
||||
dbBinlog.tableDummyBinlogs.add(tableDummy);
|
||||
}
|
||||
|
||||
@ -111,11 +115,12 @@ public class DBBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(Drogon): remove TableBinlog after DropTable, think table drop && recovery
|
||||
private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlogEnable) {
|
||||
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
|
||||
if (tableBinlog == null) {
|
||||
if (dbBinlogEnable || BinlogUtils.tableEnabledBinlog(dbId, tableId)) {
|
||||
tableBinlog = new TableBinlog(binlog, tableId);
|
||||
if (dbBinlogEnable || binlogConfigCache.isEnableTable(dbId, tableId)) {
|
||||
tableBinlog = new TableBinlog(binlogConfigCache, binlog, dbId, tableId);
|
||||
tableBinlogMap.put(tableId, tableBinlog);
|
||||
tableDummyBinlogs.add(tableBinlog.getDummyBinlog());
|
||||
}
|
||||
@ -123,21 +128,25 @@ public class DBBinlog {
|
||||
return tableBinlog;
|
||||
}
|
||||
|
||||
public void addBinlog(TBinlog binlog, boolean dbBinlogEnable) {
|
||||
// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable binlog
|
||||
public void addBinlog(TBinlog binlog) {
|
||||
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
|
||||
List<Long> tableIds = binlog.getTableIds();
|
||||
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
allBinlogs.add(binlog);
|
||||
|
||||
if (binlog.getTimestamp() > 0 && dbBinlogEnable) {
|
||||
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
|
||||
}
|
||||
|
||||
allBinlogs.add(binlog);
|
||||
|
||||
if (tableIds == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// HACK: for metadata fix
|
||||
// we should not add binlog for create table and drop table in table binlog
|
||||
if (!binlog.isSetType()) {
|
||||
return;
|
||||
}
|
||||
@ -205,22 +214,22 @@ public class DBBinlog {
|
||||
|
||||
public BinlogTombstone gc() {
|
||||
// check db
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
BinlogConfig dbBinlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
|
||||
if (dbBinlogConfig == null) {
|
||||
LOG.error("db not found. dbId: {}", dbId);
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean dbBinlogEnable = db.getBinlogConfig().isEnable();
|
||||
boolean dbBinlogEnable = dbBinlogConfig.isEnable();
|
||||
BinlogTombstone tombstone;
|
||||
if (dbBinlogEnable) {
|
||||
// db binlog is enabled, only one binlogTombstones
|
||||
long ttlSeconds = db.getBinlogConfig().getTtlSeconds();
|
||||
long ttlSeconds = dbBinlogConfig.getTtlSeconds();
|
||||
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
|
||||
|
||||
tombstone = dbBinlogEnableGc(expiredMs);
|
||||
} else {
|
||||
tombstone = dbBinlogDisableGc(db);
|
||||
tombstone = dbBinlogDisableGc();
|
||||
}
|
||||
|
||||
return tombstone;
|
||||
@ -250,7 +259,7 @@ public class DBBinlog {
|
||||
return dbTombstone;
|
||||
}
|
||||
|
||||
private BinlogTombstone dbBinlogDisableGc(Database db) {
|
||||
private BinlogTombstone dbBinlogDisableGc() {
|
||||
List<BinlogTombstone> tombstones = Lists.newArrayList();
|
||||
List<TableBinlog> tableBinlogs;
|
||||
|
||||
@ -262,7 +271,7 @@ public class DBBinlog {
|
||||
}
|
||||
|
||||
for (TableBinlog tableBinlog : tableBinlogs) {
|
||||
BinlogTombstone tombstone = tableBinlog.gc(db);
|
||||
BinlogTombstone tombstone = tableBinlog.ttlGc();
|
||||
if (tombstone != null) {
|
||||
tombstones.add(tombstone);
|
||||
}
|
||||
@ -348,7 +357,7 @@ public class DBBinlog {
|
||||
List<BinlogTombstone> tableTombstones = Lists.newArrayList();
|
||||
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
|
||||
// step 2.1: gc tableBinlog,and get table tombstone
|
||||
BinlogTombstone tableTombstone = tableBinlog.gc(expiredCommitSeq);
|
||||
BinlogTombstone tableTombstone = tableBinlog.commitSeqGc(expiredCommitSeq);
|
||||
if (tableTombstone != null) {
|
||||
tableTombstones.add(tableTombstone);
|
||||
}
|
||||
|
||||
@ -17,9 +17,7 @@
|
||||
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
@ -37,11 +35,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
public class TableBinlog {
|
||||
private static final Logger LOG = LogManager.getLogger(TableBinlog.class);
|
||||
|
||||
private long dbId;
|
||||
private long tableId;
|
||||
private ReentrantReadWriteLock lock;
|
||||
private TreeSet<TBinlog> binlogs;
|
||||
private BinlogConfigCache binlogConfigCache;
|
||||
|
||||
public TableBinlog(TBinlog binlog, long tableId) {
|
||||
public TableBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog, long dbId, long tableId) {
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
lock = new ReentrantReadWriteLock();
|
||||
binlogs = Sets.newTreeSet(Comparator.comparingLong(TBinlog::getCommitSeq));
|
||||
@ -53,6 +54,7 @@ public class TableBinlog {
|
||||
dummy = BinlogUtils.newDummyBinlog(binlog.getDbId(), tableId);
|
||||
}
|
||||
binlogs.add(dummy);
|
||||
this.binlogConfigCache = binlogConfigCache;
|
||||
}
|
||||
|
||||
public TBinlog getDummyBinlog() {
|
||||
@ -100,7 +102,7 @@ public class TableBinlog {
|
||||
}
|
||||
}
|
||||
|
||||
private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator check) {
|
||||
private Pair<TBinlog, Long> getLastUpsertAndLargestCommitSeq(long expired, BinlogComparator checker) {
|
||||
if (binlogs.size() <= 1) {
|
||||
return null;
|
||||
}
|
||||
@ -111,7 +113,7 @@ public class TableBinlog {
|
||||
TBinlog lastExpiredBinlog = null;
|
||||
while (iter.hasNext()) {
|
||||
TBinlog binlog = iter.next();
|
||||
if (check.isExpired(binlog, expired)) {
|
||||
if (checker.isExpired(binlog, expired)) {
|
||||
lastExpiredBinlog = binlog;
|
||||
--binlog.table_ref;
|
||||
if (binlog.getType() == TBinlogType.UPSERT) {
|
||||
@ -133,7 +135,7 @@ public class TableBinlog {
|
||||
}
|
||||
|
||||
// this method call when db binlog enable
|
||||
public BinlogTombstone gc(long expiredCommitSeq) {
|
||||
public BinlogTombstone commitSeqGc(long expiredCommitSeq) {
|
||||
Pair<TBinlog, Long> tombstoneInfo;
|
||||
|
||||
// step 1: get tombstoneUpsertBinlog and dummyBinlog
|
||||
@ -163,31 +165,20 @@ public class TableBinlog {
|
||||
}
|
||||
|
||||
// this method call when db binlog disable
|
||||
public BinlogTombstone gc(Database db) {
|
||||
public BinlogTombstone ttlGc() {
|
||||
// step 1: get expire time
|
||||
OlapTable table;
|
||||
try {
|
||||
Table tbl = db.getTableOrMetaException(tableId);
|
||||
if (tbl == null) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return null;
|
||||
}
|
||||
if (!(tbl instanceof OlapTable)) {
|
||||
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return null;
|
||||
}
|
||||
table = (OlapTable) tbl;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
BinlogConfig tableBinlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);
|
||||
if (tableBinlogConfig == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
long ttlSeconds = table.getBinlogConfig().getTtlSeconds();
|
||||
long ttlSeconds = tableBinlogConfig.getTtlSeconds();
|
||||
long expiredMs = BinlogUtils.getExpiredMs(ttlSeconds);
|
||||
|
||||
if (expiredMs < 0) {
|
||||
return null;
|
||||
}
|
||||
LOG.info("ttl gc. dbId: {}, tableId: {}, expiredMs: {}", dbId, tableId, expiredMs);
|
||||
|
||||
// step 2: get tombstoneUpsertBinlog and dummyBinlog
|
||||
Pair<TBinlog, Long> tombstoneInfo;
|
||||
|
||||
@ -4400,8 +4400,9 @@ public class Env {
|
||||
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), table, false);
|
||||
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(table.getId(), DynamicPartitionScheduler.LAST_UPDATE_TIME,
|
||||
TimeUtils.getCurrentFormatTime());
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
|
||||
logProperties);
|
||||
ModifyTablePropertyOperationLog info =
|
||||
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
|
||||
logProperties);
|
||||
editLog.logDynamicPartition(info);
|
||||
}
|
||||
|
||||
@ -4473,9 +4474,9 @@ public class Env {
|
||||
public void modifyTableDefaultReplicaAllocation(Database db, OlapTable table, Map<String, String> properties) {
|
||||
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
|
||||
table.setReplicaAllocation(properties);
|
||||
// log
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
|
||||
properties);
|
||||
ModifyTablePropertyOperationLog info =
|
||||
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
|
||||
properties);
|
||||
editLog.logModifyReplicationNum(info);
|
||||
LOG.debug("modify table[{}] replication num to {}", table.getName(),
|
||||
properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
|
||||
@ -4500,8 +4501,9 @@ public class Env {
|
||||
table.getPartitionInfo().setStoragePolicy(partition.getId(), tableProperty.getStoragePolicy());
|
||||
}
|
||||
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
|
||||
properties);
|
||||
ModifyTablePropertyOperationLog info =
|
||||
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
|
||||
properties);
|
||||
editLog.logModifyInMemory(info);
|
||||
}
|
||||
|
||||
@ -4510,8 +4512,9 @@ public class Env {
|
||||
|
||||
table.setBinlogConfig(newBinlogConfig);
|
||||
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
|
||||
newBinlogConfig.toProperties());
|
||||
ModifyTablePropertyOperationLog info =
|
||||
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
|
||||
newBinlogConfig.toProperties());
|
||||
editLog.logUpdateBinlogConfig(info);
|
||||
}
|
||||
|
||||
|
||||
@ -759,6 +759,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
public void alterDatabaseProperty(AlterDatabasePropertyStmt stmt) throws DdlException {
|
||||
String dbName = stmt.getDbName();
|
||||
Database db = (Database) getDbOrDdlException(dbName);
|
||||
long dbId = db.getId();
|
||||
Map<String, String> properties = stmt.getProperties();
|
||||
|
||||
db.writeLockOrDdlException();
|
||||
@ -768,7 +769,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
return;
|
||||
}
|
||||
|
||||
AlterDatabasePropertyInfo info = new AlterDatabasePropertyInfo(dbName, properties);
|
||||
AlterDatabasePropertyInfo info = new AlterDatabasePropertyInfo(dbId, dbName, properties);
|
||||
Env.getCurrentEnv().getEditLog().logAlterDatabaseProperty(info);
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
|
||||
@ -29,6 +29,9 @@ import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class AlterDatabasePropertyInfo implements Writable {
|
||||
@SerializedName(value = "dbId")
|
||||
private long dbId;
|
||||
|
||||
@SerializedName(value = "dbName")
|
||||
private String dbName;
|
||||
|
||||
@ -41,11 +44,16 @@ public class AlterDatabasePropertyInfo implements Writable {
|
||||
this.properties = null;
|
||||
}
|
||||
|
||||
public AlterDatabasePropertyInfo(String dbName, Map<String, String> properties) {
|
||||
public AlterDatabasePropertyInfo(long dbId, String dbName, Map<String, String> properties) {
|
||||
this.dbId = dbId;
|
||||
this.dbName = dbName;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return dbName;
|
||||
}
|
||||
|
||||
@ -750,6 +750,7 @@ public class EditLog {
|
||||
case OperationType.OP_MODIFY_REPLICATION_NUM: {
|
||||
ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData();
|
||||
env.replayModifyTableProperty(opCode, log);
|
||||
env.getBinlogManager().addModifyTableProperty(log, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: {
|
||||
@ -1039,6 +1040,7 @@ public class EditLog {
|
||||
LOG.info("replay alter database property: {}", alterDatabasePropertyInfo);
|
||||
env.replayAlterDatabaseProperty(alterDatabasePropertyInfo.getDbName(),
|
||||
alterDatabasePropertyInfo.getProperties());
|
||||
env.getBinlogManager().addAlterDatabaseProperty(alterDatabasePropertyInfo, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_GC_BINLOG: {
|
||||
@ -1623,24 +1625,30 @@ public class EditLog {
|
||||
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_TYPE, tableInfo);
|
||||
}
|
||||
|
||||
public void logDynamicPartition(ModifyTablePropertyOperationLog info) {
|
||||
logEdit(OperationType.OP_DYNAMIC_PARTITION, info);
|
||||
private long logModifyTableProperty(short op, ModifyTablePropertyOperationLog info) {
|
||||
long logId = logEdit(op, info);
|
||||
Env.getCurrentEnv().getBinlogManager().addModifyTableProperty(info, logId);
|
||||
return logId;
|
||||
}
|
||||
|
||||
public void logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
|
||||
logEdit(OperationType.OP_MODIFY_REPLICATION_NUM, info);
|
||||
public void logDynamicPartition(ModifyTablePropertyOperationLog info) {
|
||||
logModifyTableProperty(OperationType.OP_DYNAMIC_PARTITION, info);
|
||||
}
|
||||
|
||||
public long logModifyReplicationNum(ModifyTablePropertyOperationLog info) {
|
||||
return logModifyTableProperty(OperationType.OP_MODIFY_REPLICATION_NUM, info);
|
||||
}
|
||||
|
||||
public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info) {
|
||||
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info);
|
||||
}
|
||||
|
||||
public void logModifyInMemory(ModifyTablePropertyOperationLog info) {
|
||||
logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
|
||||
public long logModifyInMemory(ModifyTablePropertyOperationLog info) {
|
||||
return logModifyTableProperty(OperationType.OP_MODIFY_IN_MEMORY, info);
|
||||
}
|
||||
|
||||
public void logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
|
||||
logEdit(OperationType.OP_UPDATE_BINLOG_CONFIG, info);
|
||||
public long logUpdateBinlogConfig(ModifyTablePropertyOperationLog info) {
|
||||
return logModifyTableProperty(OperationType.OP_UPDATE_BINLOG_CONFIG, info);
|
||||
}
|
||||
|
||||
public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) {
|
||||
@ -1829,7 +1837,9 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
|
||||
return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
|
||||
long logId = logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
|
||||
Env.getCurrentEnv().getBinlogManager().addAlterDatabaseProperty(log, logId);
|
||||
return logId;
|
||||
}
|
||||
|
||||
public long logGcBinlog(BinlogGcInfo log) {
|
||||
|
||||
@ -35,13 +35,27 @@ public class ModifyTablePropertyOperationLog implements Writable {
|
||||
private long dbId;
|
||||
@SerializedName(value = "tableId")
|
||||
private long tableId;
|
||||
@SerializedName(value = "tableName")
|
||||
private String tableName;
|
||||
@SerializedName(value = "properties")
|
||||
private Map<String, String> properties = new HashMap<>();
|
||||
@SerializedName(value = "sql")
|
||||
private String sql;
|
||||
|
||||
public ModifyTablePropertyOperationLog(long dbId, long tableId, Map<String, String> properties) {
|
||||
public ModifyTablePropertyOperationLog(long dbId, long tableId, String tableName, Map<String, String> properties) {
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
this.tableName = tableName;
|
||||
this.properties = properties;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("SET (");
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
|
||||
}
|
||||
sb.deleteCharAt(sb.length() - 1); // remove last ','
|
||||
sb.append(")");
|
||||
this.sql = sb.toString();
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
@ -64,4 +78,8 @@ public class ModifyTablePropertyOperationLog implements Writable {
|
||||
public static ModifyTablePropertyOperationLog read(DataInput in) throws IOException {
|
||||
return GsonUtils.GSON.fromJson(Text.readString(in), ModifyTablePropertyOperationLog.class);
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ public class ModifyDynamicPartitionInfoTest {
|
||||
properties.put(DynamicPartitionProperty.END, "3");
|
||||
properties.put(DynamicPartitionProperty.PREFIX, "p");
|
||||
properties.put(DynamicPartitionProperty.BUCKETS, "30");
|
||||
ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new ModifyTablePropertyOperationLog(100L, 200L, properties);
|
||||
ModifyTablePropertyOperationLog modifyDynamicPartitionInfo = new ModifyTablePropertyOperationLog(100L, 200L, "test", properties);
|
||||
modifyDynamicPartitionInfo.write(out);
|
||||
out.flush();
|
||||
out.close();
|
||||
|
||||
Reference in New Issue
Block a user