@ -69,6 +69,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.Coordinator;
|
||||
import org.apache.doris.qe.InsertStreamTxnExecutor;
|
||||
import org.apache.doris.qe.QeProcessorImpl;
|
||||
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -222,7 +223,8 @@ public class InsertExecutor {
|
||||
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
|
||||
coordinator.setQueryType(TQueryType.LOAD);
|
||||
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
|
||||
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
|
||||
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator);
|
||||
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
|
||||
coordinator.exec();
|
||||
int execTimeout = ctx.getExecTimeout();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
||||
@ -267,6 +267,10 @@ public class Coordinator implements CoordInterface {
|
||||
this.tWorkloadGroups = tWorkloadGroups;
|
||||
}
|
||||
|
||||
public List<TPipelineWorkloadGroup> gettWorkloadGroups() {
|
||||
return tWorkloadGroups;
|
||||
}
|
||||
|
||||
private List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
|
||||
|
||||
private final ExecutionProfile executionProfile;
|
||||
|
||||
@ -147,6 +147,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
|
||||
import org.apache.doris.proto.Types;
|
||||
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
|
||||
import org.apache.doris.qe.ConnectContext.ConnectType;
|
||||
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.cache.Cache;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
@ -2053,8 +2054,8 @@ public class StmtExecutor {
|
||||
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
|
||||
coord.setQueryType(TQueryType.LOAD);
|
||||
profile.setExecutionProfile(coord.getExecutionProfile());
|
||||
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
|
||||
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord);
|
||||
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo);
|
||||
|
||||
Table table = insertStmt.getTargetTable();
|
||||
if (table instanceof OlapTable) {
|
||||
|
||||
@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio
|
||||
public static final String NAME = "active_queries";
|
||||
|
||||
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("BeHost", ScalarType.createStringType()),
|
||||
new Column("BePort", PrimitiveType.BIGINT),
|
||||
new Column("QueryId", ScalarType.createStringType()),
|
||||
new Column("StartTime", ScalarType.createStringType()),
|
||||
new Column("QueryTimeMs", PrimitiveType.BIGINT),
|
||||
new Column("WorkloadGroupId", PrimitiveType.BIGINT),
|
||||
new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
|
||||
new Column("ScanRows", PrimitiveType.BIGINT),
|
||||
new Column("ScanBytes", PrimitiveType.BIGINT),
|
||||
new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
|
||||
new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
|
||||
new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
|
||||
new Column("ShuffleSendRows", PrimitiveType.BIGINT),
|
||||
new Column("Database", ScalarType.createStringType()),
|
||||
new Column("FrontendInstance", ScalarType.createStringType()),
|
||||
new Column("Sql", ScalarType.createStringType()));
|
||||
|
||||
@ -56,6 +56,7 @@ import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
|
||||
import org.apache.doris.thrift.TMetadataTableRequestParams;
|
||||
import org.apache.doris.thrift.TMetadataType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TQueriesMetadataParams;
|
||||
import org.apache.doris.thrift.TQueryStatistics;
|
||||
import org.apache.doris.thrift.TRow;
|
||||
@ -83,7 +84,6 @@ import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class MetadataGenerator {
|
||||
@ -517,7 +517,7 @@ public class MetadataGenerator {
|
||||
}
|
||||
|
||||
private static TFetchSchemaTableDataResult queriesMetadataResult(TMetadataTableRequestParams params,
|
||||
TFetchSchemaTableDataRequest parentRequest) {
|
||||
TFetchSchemaTableDataRequest parentRequest) {
|
||||
if (!params.isSetQueriesMetadataParams()) {
|
||||
return errorResult("queries metadata param is not set.");
|
||||
}
|
||||
@ -531,37 +531,35 @@ public class MetadataGenerator {
|
||||
}
|
||||
selfNode = NetUtils.getHostnameByIp(selfNode);
|
||||
|
||||
// get query
|
||||
Map<Long, Map<String, TQueryStatistics>> beQsMap = Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
|
||||
.getBeQueryStatsMap();
|
||||
Set<Long> beIdSet = beQsMap.keySet();
|
||||
|
||||
List<TRow> dataBatch = Lists.newArrayList();
|
||||
Map<String, QueryInfo> queryInfoMap = QeProcessorImpl.INSTANCE.getQueryInfoMap();
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
for (Long beId : beIdSet) {
|
||||
Map<String, TQueryStatistics> qsMap = beQsMap.get(beId);
|
||||
if (qsMap == null) {
|
||||
continue;
|
||||
for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) {
|
||||
String queryId = entry.getKey();
|
||||
QueryInfo queryInfo = entry.getValue();
|
||||
|
||||
TRow trow = new TRow();
|
||||
trow.addToColumnValue(new TCell().setStringVal(queryId));
|
||||
|
||||
String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
|
||||
trow.addToColumnValue(new TCell().setStringVal(strDate));
|
||||
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
|
||||
|
||||
List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups();
|
||||
if (tgroupList != null && tgroupList.size() == 1) {
|
||||
trow.addToColumnValue(new TCell().setLongVal(tgroupList.get(0).id));
|
||||
} else {
|
||||
trow.addToColumnValue(new TCell().setLongVal(-1));
|
||||
}
|
||||
Set<String> queryIdSet = qsMap.keySet();
|
||||
for (String queryId : queryIdSet) {
|
||||
QueryInfo queryInfo = queryInfoMap.get(queryId);
|
||||
if (queryInfo == null) {
|
||||
continue;
|
||||
}
|
||||
//todo(wb) add connect context for insert select
|
||||
if (queryInfo.getConnectContext() != null && !Env.getCurrentEnv().getAccessManager()
|
||||
.checkDbPriv(queryInfo.getConnectContext(), queryInfo.getConnectContext().getDatabase(),
|
||||
PrivPredicate.SELECT)) {
|
||||
continue;
|
||||
}
|
||||
TQueryStatistics qs = qsMap.get(queryId);
|
||||
Backend be = Env.getCurrentEnv().getClusterInfo().getBackend(beId);
|
||||
TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be, selfNode, queryInfo, qs);
|
||||
dataBatch.add(tRow);
|
||||
|
||||
if (queryInfo.getConnectContext() != null) {
|
||||
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
|
||||
} else {
|
||||
trow.addToColumnValue(new TCell().setStringVal(""));
|
||||
}
|
||||
trow.addToColumnValue(new TCell().setStringVal(selfNode));
|
||||
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
|
||||
dataBatch.add(trow);
|
||||
}
|
||||
|
||||
/* Get the query results from other FE also */
|
||||
|
||||
Reference in New Issue
Block a user