[feature](show) add new statement show proc '/current_query_stmts' (#7487)
To show the the query statement at first level.
This commit is contained in:
@ -46,14 +46,9 @@ public class ShowProcStmt extends ShowStmt {
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
|
||||
"ADMIN");
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
|
||||
node = ProcService.getInstance().open(path);
|
||||
if (node == null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_PROC_PATH, path);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -19,11 +19,8 @@ package org.apache.doris.common.proc;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.qe.QueryStatisticsItem;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.common.util.QueryStatisticsFormatter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -52,23 +49,6 @@ public class CurrentQueryFragmentProcNode implements ProcNodeInterface {
|
||||
return requestFragmentExecInfos();
|
||||
}
|
||||
|
||||
private TNetworkAddress toBrpcHost(TNetworkAddress host) throws AnalysisException {
|
||||
final Backend backend = Catalog.getCurrentSystemInfo().getBackendWithBePort(
|
||||
host.getHostname(), host.getPort());
|
||||
if (backend == null) {
|
||||
throw new AnalysisException(new StringBuilder("Backend ")
|
||||
.append(host.getHostname())
|
||||
.append(":")
|
||||
.append(host.getPort())
|
||||
.append(" does not exist")
|
||||
.toString());
|
||||
}
|
||||
if (backend.getBrpcPort() < 0) {
|
||||
throw new AnalysisException("BRPC port isn't exist.");
|
||||
}
|
||||
return new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
|
||||
}
|
||||
|
||||
private ProcResult requestFragmentExecInfos() throws AnalysisException {
|
||||
final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider();
|
||||
final Collection<CurrentQueryInfoProvider.InstanceStatistics> instanceStatisticsCollection
|
||||
|
||||
@ -0,0 +1,72 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.proc;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.qe.QeProcessorImpl;
|
||||
import org.apache.doris.qe.QueryStatisticsItem;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/*
|
||||
* show proc "/current_query_stmts"
|
||||
*/
|
||||
public class CurrentQueryStatementsProcNode implements ProcNodeInterface {
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
|
||||
.add("QueryId").add("ConnectionId").add("Database").add("User")
|
||||
.add("ExecTime").add("SqlHash").add("Statement").build();
|
||||
|
||||
private static final int EXEC_TIME_INDEX = 5;
|
||||
|
||||
@Override
|
||||
public ProcResult fetchResult() throws AnalysisException {
|
||||
final BaseProcResult result = new BaseProcResult();
|
||||
final Map<String, QueryStatisticsItem> statistic =
|
||||
QeProcessorImpl.INSTANCE.getQueryStatistics();
|
||||
result.setNames(TITLE_NAMES.asList());
|
||||
final List<List<String>> sortedRowData = Lists.newArrayList();
|
||||
|
||||
final CurrentQueryInfoProvider provider = new CurrentQueryInfoProvider();
|
||||
final Map<String, CurrentQueryInfoProvider.QueryStatistics> statisticsMap
|
||||
= provider.getQueryStatistics(statistic.values());
|
||||
for (QueryStatisticsItem item : statistic.values()) {
|
||||
final List<String> values = Lists.newArrayList();
|
||||
values.add(item.getQueryId());
|
||||
values.add(item.getConnId());
|
||||
values.add(item.getDb());
|
||||
values.add(item.getUser());
|
||||
values.add(item.getQueryExecTime());
|
||||
values.add(DigestUtils.md5Hex(item.getSql()));
|
||||
values.add(item.getSql());
|
||||
sortedRowData.add(values);
|
||||
}
|
||||
|
||||
// sort according to ExecTime
|
||||
sortedRowData.sort((l1, l2) -> {
|
||||
final int execTime1 = Integer.parseInt(l1.get(EXEC_TIME_INDEX));
|
||||
final int execTime2 = Integer.parseInt(l2.get(EXEC_TIME_INDEX));
|
||||
return execTime2 - execTime1;
|
||||
});
|
||||
result.setRows(sortedRowData);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -26,10 +26,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -38,7 +34,6 @@ import java.util.Map;
|
||||
* only set variable "set is_report_success = true" to enable "ScanBytes" and "ProcessRows".
|
||||
*/
|
||||
public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
|
||||
private static final Logger LOG = LogManager.getLogger(CurrentQueryStatisticsProcDir.class);
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
|
||||
.add("QueryId").add("ConnectionId").add("Database").add("User")
|
||||
.add("ScanBytes").add("ProcessRows").add("ExecTime").build();
|
||||
@ -95,13 +90,10 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
|
||||
sortedRowData.add(values);
|
||||
}
|
||||
// sort according to ExecTime
|
||||
sortedRowData.sort(new Comparator<List<String>>() {
|
||||
@Override
|
||||
public int compare(List<String> l1, List<String> l2) {
|
||||
final int execTime1 = Integer.valueOf(l1.get(EXEC_TIME_INDEX));
|
||||
final int execTime2 = Integer.valueOf(l2.get(EXEC_TIME_INDEX));
|
||||
return execTime1 <= execTime2 ? 1 : -1;
|
||||
}
|
||||
sortedRowData.sort((l1, l2) -> {
|
||||
final int execTime1 = Integer.parseInt(l1.get(EXEC_TIME_INDEX));
|
||||
final int execTime2 = Integer.parseInt(l2.get(EXEC_TIME_INDEX));
|
||||
return execTime2 - execTime1;
|
||||
});
|
||||
result.setRows(sortedRowData);
|
||||
return result;
|
||||
|
||||
@ -22,6 +22,8 @@ import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -48,6 +50,7 @@ public final class ProcService {
|
||||
root.register("trash", new TrashProcDir());
|
||||
root.register("monitor", new MonitorProcDir());
|
||||
root.register("current_queries", new CurrentQueryStatisticsProcDir());
|
||||
root.register("current_query_stmts", new CurrentQueryStatementsProcNode());
|
||||
root.register("current_backend_instances", new CurrentQueryBackendInstanceProcDir());
|
||||
root.register("cluster_balance", new ClusterBalanceProcDir());
|
||||
root.register("routine_loads", new RoutineLoadsProcDir());
|
||||
@ -120,7 +123,7 @@ public final class ProcService {
|
||||
// the last character of path is '/', the current is must a directory
|
||||
if (pos == last) {
|
||||
// now pos == path.length()
|
||||
if (curNode == null || !(curNode instanceof ProcDirInterface)) {
|
||||
if (!(curNode instanceof ProcDirInterface)) {
|
||||
String errMsg = path + " is not a directory";
|
||||
LOG.warn(errMsg);
|
||||
throw new AnalysisException(errMsg);
|
||||
@ -137,7 +140,7 @@ public final class ProcService {
|
||||
// 这里使用pos,因为有可能path后面会有space字段被提前截断
|
||||
curNode = ((ProcDirInterface) curNode).lookup(path.substring(last, pos));
|
||||
if (curNode == null) {
|
||||
throw new AnalysisException("Cannot find path: " + path);
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_PROC_PATH, path);
|
||||
}
|
||||
return curNode;
|
||||
}
|
||||
@ -145,7 +148,7 @@ public final class ProcService {
|
||||
// 将node注册到根节点下的name下
|
||||
public synchronized boolean register(String name, ProcNodeInterface node) {
|
||||
if (Strings.isNullOrEmpty(name) || node == null) {
|
||||
LOG.warn("register porc service invalid input.");
|
||||
LOG.warn("register proc service invalid input.");
|
||||
return false;
|
||||
}
|
||||
if (root.lookup(name) != null) {
|
||||
|
||||
Reference in New Issue
Block a user