[improvement] support show query stmt in show processlist (#11232)
This commit is contained in:
@ -2654,7 +2654,7 @@ show_param ::=
|
||||
/* show processlist */
|
||||
| opt_full KW_PROCESSLIST
|
||||
{:
|
||||
RESULT = new ShowProcesslistStmt();
|
||||
RESULT = new ShowProcesslistStmt(parser.isVerbose);
|
||||
:}
|
||||
/* routine */
|
||||
| procedure_or_function KW_STATUS opt_wild_where
|
||||
|
||||
@ -25,18 +25,26 @@ import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
// SHOW PROCESSLIST statement.
|
||||
// Used to show connection belong to this user.
|
||||
public class ShowProcesslistStmt extends ShowStmt {
|
||||
private static final ShowResultSetMetaData META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)))
|
||||
.addColumn(new Column("User", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Host", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Cluster", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Db", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Command", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Time", ScalarType.createType(PrimitiveType.INT)))
|
||||
.addColumn(new Column("State", ScalarType.createVarchar(64)))
|
||||
.addColumn(new Column("Info", ScalarType.createVarchar(16)))
|
||||
.build();
|
||||
private static final ShowResultSetMetaData META_DATA = ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)))
|
||||
.addColumn(new Column("User", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Host", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Cluster", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Db", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Command", ScalarType.createVarchar(16)))
|
||||
.addColumn(new Column("Time", ScalarType.createType(PrimitiveType.INT)))
|
||||
.addColumn(new Column("State", ScalarType.createVarchar(64)))
|
||||
.addColumn(new Column("Info", ScalarType.STRING)).build();
|
||||
|
||||
private boolean isFull;
|
||||
|
||||
public ShowProcesslistStmt(boolean isFull) {
|
||||
this.isFull = isFull;
|
||||
}
|
||||
|
||||
public boolean isFull() {
|
||||
return isFull;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) {
|
||||
@ -44,7 +52,7 @@ public class ShowProcesslistStmt extends ShowStmt {
|
||||
|
||||
@Override
|
||||
public String toSql() {
|
||||
return "SHOW PROCESSLIST";
|
||||
return "SHOW " + (isFull ? "FULL" : "") + "PROCESSLIST";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -61,7 +61,8 @@ public class SessionController extends BaseController {
|
||||
}
|
||||
|
||||
private void appendSessionInfo(Map<String, Object> result) {
|
||||
List<ConnectContext.ThreadInfo> threadInfos = ExecuteEnv.getInstance().getScheduler().listConnection("root");
|
||||
List<ConnectContext.ThreadInfo> threadInfos = ExecuteEnv.getInstance().getScheduler()
|
||||
.listConnection("root", false);
|
||||
List<List<String>> rows = Lists.newArrayList();
|
||||
|
||||
result.put("column_names", SESSION_TABLE_HEADER);
|
||||
|
||||
@ -504,8 +504,8 @@ public class ConnectContext {
|
||||
|
||||
// kill operation with no protect.
|
||||
public void kill(boolean killConnection) {
|
||||
LOG.warn("kill timeout query, {}, kill connection: {}",
|
||||
getMysqlChannel().getRemoteHostPortString(), killConnection);
|
||||
LOG.warn("kill query from {}, kill connection: {}", getMysqlChannel().getRemoteHostPortString(),
|
||||
killConnection);
|
||||
|
||||
if (killConnection) {
|
||||
isKilled = true;
|
||||
@ -551,10 +551,11 @@ public class ConnectContext {
|
||||
}
|
||||
|
||||
// Helper to dump connection information.
|
||||
public ThreadInfo toThreadInfo() {
|
||||
public ThreadInfo toThreadInfo(boolean isFull) {
|
||||
if (threadInfo == null) {
|
||||
threadInfo = new ThreadInfo();
|
||||
}
|
||||
threadInfo.isFull = isFull;
|
||||
return threadInfo;
|
||||
}
|
||||
|
||||
@ -584,6 +585,8 @@ public class ConnectContext {
|
||||
}
|
||||
|
||||
public class ThreadInfo {
|
||||
public boolean isFull;
|
||||
|
||||
public List<String> toRow(long nowMs) {
|
||||
List<String> row = Lists.newArrayList();
|
||||
row.add("" + connectionId);
|
||||
@ -594,7 +597,15 @@ public class ConnectContext {
|
||||
row.add(command.toString());
|
||||
row.add("" + (nowMs - startTime) / 1000);
|
||||
row.add("");
|
||||
row.add("");
|
||||
if (queryId != null) {
|
||||
String sql = QeProcessorImpl.INSTANCE.getCurrentQueryByQueryId(queryId);
|
||||
if (!isFull) {
|
||||
sql = sql.substring(0, Math.min(sql.length(), 100));
|
||||
}
|
||||
row.add(sql);
|
||||
} else {
|
||||
row.add("");
|
||||
}
|
||||
return row;
|
||||
}
|
||||
}
|
||||
@ -604,3 +615,4 @@ public class ConnectContext {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -136,17 +136,16 @@ public class ConnectScheduler {
|
||||
return numberConnection.get();
|
||||
}
|
||||
|
||||
public List<ConnectContext.ThreadInfo> listConnection(String user) {
|
||||
public List<ConnectContext.ThreadInfo> listConnection(String user, boolean isFull) {
|
||||
List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
|
||||
for (ConnectContext ctx : connectionMap.values()) {
|
||||
// Check auth
|
||||
if (!ctx.getQualifiedUser().equals(user)
|
||||
&& !Env.getCurrentEnv().getAuth().checkGlobalPriv(ConnectContext.get(),
|
||||
PrivPredicate.GRANT)) {
|
||||
if (!ctx.getQualifiedUser().equals(user) && !Env.getCurrentEnv().getAuth()
|
||||
.checkGlobalPriv(ConnectContext.get(), PrivPredicate.GRANT)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
infos.add(ctx.toThreadInfo());
|
||||
infos.add(ctx.toThreadInfo(isFull));
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
@ -39,5 +39,7 @@ public interface QeProcessor {
|
||||
|
||||
Map<String, QueryStatisticsItem> getQueryStatistics();
|
||||
|
||||
String getCurrentQueryByQueryId(TUniqueId queryId);
|
||||
|
||||
Coordinator getCoordinator(TUniqueId queryId);
|
||||
}
|
||||
|
||||
@ -200,6 +200,15 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCurrentQueryByQueryId(TUniqueId queryId) {
|
||||
QueryInfo info = coordinatorMap.get(queryId);
|
||||
if (info != null && info.sql != null) {
|
||||
return info.sql;
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public static final class QueryInfo {
|
||||
private final ConnectContext connectContext;
|
||||
private final Coordinator coord;
|
||||
|
||||
@ -375,7 +375,8 @@ public class ShowExecutor {
|
||||
ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
|
||||
List<List<String>> rowSet = Lists.newArrayList();
|
||||
|
||||
List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler().listConnection(ctx.getQualifiedUser());
|
||||
List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
|
||||
.listConnection(ctx.getQualifiedUser(), showStmt.isFull());
|
||||
long nowMs = System.currentTimeMillis();
|
||||
for (ConnectContext.ThreadInfo info : threadInfos) {
|
||||
rowSet.add(info.toRow(nowMs));
|
||||
|
||||
@ -1004,7 +1004,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
sendResult(isOutfileQuery, false, queryStmt, channel, null, null);
|
||||
}
|
||||
|
||||
|
||||
private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable queryStmt, MysqlChannel channel,
|
||||
CacheAnalyzer cacheAnalyzer, InternalService.PFetchCacheResult cacheResult) throws Exception {
|
||||
// 1. If this is a query with OUTFILE clause, eg: select * from tbl1 into outfile xxx,
|
||||
|
||||
@ -116,8 +116,8 @@ public class ConnectContextTest {
|
||||
Assert.assertEquals(MysqlCommand.COM_PING, ctx.getCommand());
|
||||
|
||||
// Thread info
|
||||
Assert.assertNotNull(ctx.toThreadInfo());
|
||||
List<String> row = ctx.toThreadInfo().toRow(1000);
|
||||
Assert.assertNotNull(ctx.toThreadInfo(false));
|
||||
List<String> row = ctx.toThreadInfo(false).toRow(1000);
|
||||
Assert.assertEquals(9, row.size());
|
||||
Assert.assertEquals("101", row.get(0));
|
||||
Assert.assertEquals("testUser", row.get(1));
|
||||
|
||||
@ -249,9 +249,9 @@ public class ShowExecutorTest {
|
||||
ConnectScheduler scheduler = new ConnectScheduler(10);
|
||||
new Expectations(scheduler) {
|
||||
{
|
||||
scheduler.listConnection("testCluster:testUser");
|
||||
scheduler.listConnection("testCluster:testUser", anyBoolean);
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList(ctx.toThreadInfo());
|
||||
result = Lists.newArrayList(ctx.toThreadInfo(false));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user