[Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907)

This commit is contained in:
wangbo
2024-02-08 15:51:47 +08:00
committed by yiguolei
parent 7f50998406
commit 24e80b23a5
8 changed files with 127 additions and 8 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
// SHOW PROCESSLIST statement.
@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt {
.addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
.addColumn(new Column("Info", ScalarType.STRING)).build();
private static final ShowResultSetMetaData ALL_META_DATA = ShowResultSetMetaData.builder()
.addColumn(new Column("CurrentConnected", ScalarType.createVarchar(16)))
.addColumn(new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)))
.addColumn(new Column("User", ScalarType.createVarchar(16)))
.addColumn(new Column("Host", ScalarType.createVarchar(16)))
.addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
.addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
.addColumn(new Column("Db", ScalarType.createVarchar(16)))
.addColumn(new Column("Command", ScalarType.createVarchar(16)))
.addColumn(new Column("Time", ScalarType.createType(PrimitiveType.INT)))
.addColumn(new Column("State", ScalarType.createVarchar(64)))
.addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
.addColumn(new Column("Info", ScalarType.STRING))
.addColumn(new Column("FE", ScalarType.createVarchar(16))).build();
private boolean isFull;
private boolean isShowAllFe;
public ShowProcesslistStmt(boolean isFull) {
this.isFull = isFull;
@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public void analyze(Analyzer analyzer) {
this.isShowAllFe = ConnectContext.get().getSessionVariable().getShowAllFeConnection();
}
public boolean isShowAllFe() {
return isShowAllFe;
}
@Override
@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public ShowResultSetMetaData getMetaData() {
return META_DATA;
return isShowAllFe ? ALL_META_DATA : META_DATA;
}
}

View File

@ -53,7 +53,7 @@ public class SessionController extends RestBaseController {
private static final List<String> SESSION_TABLE_HEADER = Lists.newArrayList();
private static final List<String> ALL_SESSION_TABLE_HEADER = Lists.newArrayList("FE");
private static final List<String> ALL_SESSION_TABLE_HEADER = Lists.newArrayList();
private static final Logger LOG = LogManager.getLogger(SessionController.class);
@ -71,6 +71,7 @@ public class SessionController extends RestBaseController {
SESSION_TABLE_HEADER.add("QueryId");
SESSION_TABLE_HEADER.add("Info");
ALL_SESSION_TABLE_HEADER.addAll(SESSION_TABLE_HEADER);
ALL_SESSION_TABLE_HEADER.add("FE");
}
@RequestMapping(path = "/session/all", method = RequestMethod.GET)

View File

@ -980,9 +980,6 @@ public class ConnectContext {
public List<String> toRow(int connId, long nowMs, boolean showFe) {
List<String> row = Lists.newArrayList();
if (showFe) {
row.add(Env.getCurrentEnv().getSelfNode().getHost());
}
if (connId == connectionId) {
row.add("Yes");
} else {
@ -1009,6 +1006,11 @@ public class ConnectContext {
} else {
row.add("");
}
if (showFe) {
row.add(Env.getCurrentEnv().getSelfNode().getHost());
}
return row;
}
}

View File

@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@ -171,6 +172,16 @@ public class ConnectScheduler {
return infos;
}
// used for thrift
public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql, boolean isShowFeHost) {
List<List<String>> list = new ArrayList<>();
long nowMs = System.currentTimeMillis();
for (ConnectContext ctx : connectionMap.values()) {
list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, isShowFeHost));
}
return list;
}
public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
traceId2QueryId.put(traceId, queryId);
}

View File

@ -507,6 +507,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
public static final String SHOW_ALL_FE_CONNECTION = "show_all_fe_connection";
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@ -1610,6 +1612,11 @@ public class SessionVariable implements Serializable, Writable {
"use other health replica when the use_fix_replica meet error" })
public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
@VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION,
description = {"when it's true show processlist statement list all fe's connection",
"当变量为true时,show processlist命令展示所有fe的连接"})
public boolean showAllFeConnection = false;
// CLOUD_VARIABLES_BEGIN
@VariableMgr.VarAttr(name = CLOUD_CLUSTER)
public String cloudCluster = "";
@ -3365,4 +3372,9 @@ public class SessionVariable implements Serializable, Writable {
public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
this.forceToLocalShuffle = forceToLocalShuffle;
}
public boolean getShowAllFeConnection() {
return this.showAllFeConnection;
}
}

View File

@ -145,6 +145,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
@ -208,7 +209,11 @@ import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TShowProcessListRequest;
import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.GlobalTransactionMgr;
@ -454,13 +459,53 @@ public class ShowExecutor {
// Handle show processlist
private void handleShowProcesslist() {
ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
List<List<String>> rowSet = Lists.newArrayList();
boolean isShowFullSql = showStmt.isFull();
boolean isShowAllFe = showStmt.isShowAllFe();
List<List<String>> rowSet = Lists.newArrayList();
List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
.listConnection(ctx.getQualifiedUser(), showStmt.isFull());
.listConnection(ctx.getQualifiedUser(), isShowFullSql);
long nowMs = System.currentTimeMillis();
for (ConnectContext.ThreadInfo info : threadInfos) {
rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, false));
rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe));
}
if (isShowAllFe) {
try {
TShowProcessListRequest request = new TShowProcessListRequest();
request.setShowFullSql(isShowFullSql);
List<Pair<String, Integer>> frontends = FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
false);
FrontendService.Client client = null;
for (Pair<String, Integer> fe : frontends) {
TNetworkAddress thriftAddress = new TNetworkAddress(fe.key(), fe.value());
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
} catch (Exception e) {
LOG.warn("Failed to get frontend {} client. exception: {}", fe.key(), e);
continue;
}
boolean isReturnToPool = false;
try {
TShowProcessListResult result = client.showProcessList(request);
if (result.process_list != null && result.process_list.size() > 0) {
rowSet.addAll(result.process_list);
}
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to request processlist to fe: {} . exception: {}", fe.key(), e);
} finally {
if (isReturnToPool) {
ClientPool.frontendPool.returnObject(thriftAddress, client);
} else {
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
}
}
}
} catch (Throwable t) {
LOG.warn(" fetch process list from other fe failed, ", t);
}
}
resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);

View File

@ -209,6 +209,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
import org.apache.doris.thrift.TShowProcessListRequest;
import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@ -3555,4 +3557,18 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw e;
}
}
@Override
public TShowProcessListResult showProcessList(TShowProcessListRequest request) {
boolean isShowFullSql = false;
if (request.isSetShowFullSql()) {
isShowFullSql = request.isShowFullSql();
}
List<List<String>> processList = ExecuteEnv.getInstance().getScheduler()
.listConnectionWithoutAuth(isShowFullSql, true);
TShowProcessListResult result = new TShowProcessListResult();
result.setProcessList(processList);
return result;
}
}