branch-2.1: [feat](binlog) filter the async mv binlogs #49028 (#49099)

Cherry-picked from #49028

Co-authored-by: walter <maochuan@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-03-28 10:01:00 +08:00
committed by GitHub
parent 2fb8e00907
commit 92176c46bf
2 changed files with 69 additions and 23 deletions

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -34,10 +35,12 @@ public class BinlogConfigCache {
private static final Logger LOG = LogManager.getLogger(BinlogConfigCache.class);
private Map<Long, BinlogConfig> dbTableBinlogEnableMap; // db or table all use id
private Map<Long, TableIf.TableType> tableTypeMap;
private ReentrantReadWriteLock lock;
public BinlogConfigCache() {
dbTableBinlogEnableMap = new HashMap<Long, BinlogConfig>();
tableTypeMap = new HashMap<Long, TableIf.TableType>();
lock = new ReentrantReadWriteLock();
}
@ -93,29 +96,8 @@ public class BinlogConfigCache {
lock.writeLock().lock();
try {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return null;
}
Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return null;
}
if (!(table instanceof OlapTable)) {
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
return null;
}
OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone
// binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
loadTableBinlogConfig(dbId, tableId);
return dbTableBinlogEnableMap.get(tableId); // null if not exists
} catch (Exception e) {
LOG.warn("fail to get table. db: {}, table id: {}", dbId, tableId);
return null;
@ -124,6 +106,48 @@ public class BinlogConfigCache {
}
}
private void loadTableBinlogConfig(long dbId, long tableId) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
return;
}
Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return;
}
if (!(table instanceof OlapTable)) { // MTMV is an instance of OlapTable
LOG.warn("table is not olap table. db: {}, table id: {}", db.getFullName(), tableId);
return;
}
OlapTable olapTable = (OlapTable) table;
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone
// binlogConfig
dbTableBinlogEnableMap.put(tableId, olapTable.getBinlogConfig());
tableTypeMap.put(tableId, table.getType());
}
public boolean isAsyncMvTable(long dbId, long tableId) {
lock.readLock().lock();
TableIf.TableType tableType = tableTypeMap.get(tableId);
lock.readLock().unlock();
if (tableType != null) {
return tableType == TableIf.TableType.MATERIALIZED_VIEW;
}
lock.writeLock().lock();
try {
loadTableBinlogConfig(dbId, tableId);
return tableTypeMap.get(tableId) == TableIf.TableType.MATERIALIZED_VIEW;
} finally {
lock.writeLock().unlock();
}
}
public boolean isEnableTable(long dbId, long tableId) {
BinlogConfig tableBinlogConfig = getTableBinlogConfig(dbId, tableId);
if (tableBinlogConfig == null) {

View File

@ -107,11 +107,33 @@ public class BinlogManager {
}
}
private boolean isAsyncMvBinlog(TBinlog binlog) {
if (!binlog.isSetTableIds()) {
return false;
}
// Filter the binlogs belong to async materialized view, since we don't support async mv right now.
for (long tableId : binlog.getTableIds()) {
if (binlogConfigCache.isAsyncMvTable(binlog.getDbId(), tableId)) {
LOG.debug("filter the async mv binlog, db {}, table {}, commit seq {}, ts {}, type {}, data {}",
binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(),
binlog.getType(), binlog.getData());
return true;
}
}
return false;
}
private void addBinlog(TBinlog binlog, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
if (isAsyncMvBinlog(binlog)) {
return;
}
LOG.debug("add binlog, db {}, table {}, commitSeq {}, timestamp {}, type {}, data {}",
binlog.getDbId(), binlog.getTableIds(), binlog.getCommitSeq(), binlog.getTimestamp(), binlog.getType(),
binlog.getData());