[ehmancement](binlog) Add show proc '/binlog' impl (#30770)
Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
@ -97,7 +97,7 @@ public class BinlogConfigCache {
|
||||
return null;
|
||||
}
|
||||
|
||||
Table table = db.getTableOrMetaException(tableId);
|
||||
Table table = db.getTableNullable(tableId);
|
||||
if (table == null) {
|
||||
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
|
||||
return null;
|
||||
@ -109,6 +109,8 @@ public class BinlogConfigCache {
|
||||
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
tableBinlogConfig = olapTable.getBinlogConfig();
|
||||
// get table binlog config, when table modify binlogConfig
|
||||
// it create a new binlog, not update inplace, so we don't need to clone binlogConfig
|
||||
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
|
||||
return tableBinlogConfig;
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -22,6 +22,8 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.common.proc.ProcResult;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
import org.apache.doris.persist.BarrierLog;
|
||||
import org.apache.doris.persist.BatchModifyPartitionsInfo;
|
||||
@ -36,6 +38,7 @@ import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -54,6 +57,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class BinlogManager {
|
||||
private static final int BUFFER_SIZE = 16 * 1024;
|
||||
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name")
|
||||
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
|
||||
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
|
||||
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
|
||||
.build();
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
|
||||
|
||||
@ -545,6 +553,22 @@ public class BinlogManager {
|
||||
return checksum;
|
||||
}
|
||||
|
||||
public ProcResult getBinlogInfo() {
|
||||
BaseProcResult result = new BaseProcResult();
|
||||
result.setNames(TITLE_NAMES);
|
||||
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
for (DBBinlog dbBinlog : dbBinlogMap.values()) {
|
||||
dbBinlog.getBinlogInfo(result);
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// remove DB
|
||||
// remove Table
|
||||
}
|
||||
|
||||
@ -90,4 +90,8 @@ public class BinlogUtils {
|
||||
long expireSeconds = currentSeconds - ttlSeconds;
|
||||
return expireSeconds * 1000;
|
||||
}
|
||||
|
||||
public static String convertTimeToReadable(long time) {
|
||||
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time));
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,10 @@
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
@ -30,6 +33,7 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -448,4 +452,64 @@ public class DBBinlog {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void getBinlogInfo(BaseProcResult result) {
|
||||
BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);
|
||||
|
||||
String dbName = "(dropped)";
|
||||
String dropped = "true";
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db != null) {
|
||||
dbName = db.getFullName();
|
||||
dropped = "false";
|
||||
}
|
||||
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
|
||||
if (dbBinlogEnable) {
|
||||
List<String> info = new ArrayList<>();
|
||||
|
||||
info.add(dbName);
|
||||
String type = "db";
|
||||
info.add(type);
|
||||
String id = String.valueOf(dbId);
|
||||
info.add(id);
|
||||
info.add(dropped);
|
||||
String binlogLength = String.valueOf(allBinlogs.size());
|
||||
info.add(binlogLength);
|
||||
String firstBinlogCommittedTime = null;
|
||||
String readableFirstBinlogCommittedTime = null;
|
||||
if (!timestamps.isEmpty()) {
|
||||
long timestamp = timestamps.get(0).second;
|
||||
firstBinlogCommittedTime = String.valueOf(timestamp);
|
||||
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
|
||||
}
|
||||
info.add(firstBinlogCommittedTime);
|
||||
info.add(readableFirstBinlogCommittedTime);
|
||||
String lastBinlogCommittedTime = null;
|
||||
String readableLastBinlogCommittedTime = null;
|
||||
if (!timestamps.isEmpty()) {
|
||||
long timestamp = timestamps.get(timestamps.size() - 1).second;
|
||||
lastBinlogCommittedTime = String.valueOf(timestamp);
|
||||
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
|
||||
}
|
||||
info.add(lastBinlogCommittedTime);
|
||||
info.add(readableLastBinlogCommittedTime);
|
||||
String binlogTtlSeconds = null;
|
||||
if (binlogConfig != null) {
|
||||
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
|
||||
}
|
||||
info.add(binlogTtlSeconds);
|
||||
|
||||
result.addRow(info);
|
||||
} else {
|
||||
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
|
||||
tableBinlog.getBinlogInfo(db, result);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,11 @@
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.catalog.BinlogConfig;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.thrift.TBinlog;
|
||||
import org.apache.doris.thrift.TBinlogType;
|
||||
import org.apache.doris.thrift.TStatus;
|
||||
@ -27,8 +31,10 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@ -233,4 +239,80 @@ public class TableBinlog {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void getBinlogInfo(Database db, BaseProcResult result) {
|
||||
BinlogConfig binlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);
|
||||
|
||||
String tableName = null;
|
||||
String dropped = null;
|
||||
if (db == null) {
|
||||
tableName = "(dropped).(unknown)";
|
||||
dropped = "true";
|
||||
} else {
|
||||
String dbName = db.getFullName();
|
||||
Table table = db.getTableNullable(tableId);
|
||||
if (table == null) {
|
||||
dropped = "true";
|
||||
tableName = dbName + ".(dropped)";
|
||||
}
|
||||
|
||||
dropped = "false";
|
||||
if (table instanceof OlapTable) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
tableName = dbName + "." + olapTable.getName();
|
||||
} else {
|
||||
tableName = dbName + ".(not_olaptable)";
|
||||
}
|
||||
}
|
||||
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
List<String> info = new ArrayList<>();
|
||||
|
||||
info.add(tableName);
|
||||
String type = "table";
|
||||
info.add(type);
|
||||
|
||||
String id = String.valueOf(tableId);
|
||||
info.add(id);
|
||||
info.add(dropped);
|
||||
String binlogLength = String.valueOf(binlogs.size());
|
||||
info.add(binlogLength);
|
||||
String firstBinlogCommittedTime = null;
|
||||
String readableFirstBinlogCommittedTime = null;
|
||||
for (TBinlog binlog : binlogs) {
|
||||
long timestamp = binlog.getTimestamp();
|
||||
if (timestamp != -1) {
|
||||
firstBinlogCommittedTime = String.valueOf(timestamp);
|
||||
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
|
||||
break;
|
||||
}
|
||||
}
|
||||
info.add(firstBinlogCommittedTime);
|
||||
info.add(readableFirstBinlogCommittedTime);
|
||||
String lastBinlogCommittedTime = null;
|
||||
String readableLastBinlogCommittedTime = null;
|
||||
Iterator<TBinlog> iterator = binlogs.descendingIterator();
|
||||
while (iterator.hasNext()) {
|
||||
TBinlog binlog = iterator.next();
|
||||
long timestamp = binlog.getTimestamp();
|
||||
if (timestamp != -1) {
|
||||
lastBinlogCommittedTime = String.valueOf(timestamp);
|
||||
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
|
||||
break;
|
||||
}
|
||||
}
|
||||
info.add(lastBinlogCommittedTime);
|
||||
info.add(readableLastBinlogCommittedTime);
|
||||
String binlogTtlSeconds = null;
|
||||
if (binlogConfig != null) {
|
||||
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
|
||||
}
|
||||
info.add(binlogTtlSeconds);
|
||||
|
||||
result.addRow(info);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.proc;
|
||||
|
||||
import org.apache.doris.binlog.BinlogManager;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
|
||||
public class BinlogProcDir implements ProcDirInterface {
|
||||
@Override
|
||||
public boolean register(String name, ProcNodeInterface node) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcNodeInterface lookup(String name) throws AnalysisException {
|
||||
throw new AnalysisException("not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcResult fetchResult() throws AnalysisException {
|
||||
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
|
||||
if (binlogManager == null) {
|
||||
throw new AnalysisException("binlog manager is not initialized");
|
||||
}
|
||||
|
||||
return binlogManager.getBinlogInfo();
|
||||
}
|
||||
}
|
||||
@ -58,6 +58,7 @@ public final class ProcService {
|
||||
root.register("colocation_group", new ColocationGroupProcDir());
|
||||
root.register("bdbje", new BDBJEProcDir());
|
||||
root.register("diagnose", new DiagnoseProcDir());
|
||||
root.register("binlog", new BinlogProcDir());
|
||||
}
|
||||
|
||||
// 通过指定的路径获得对应的PROC Node
|
||||
|
||||
Reference in New Issue
Block a user