Add QUEUE_START_TIME/QUEUE_END_TIME/QUERY_STATUS column for active_queries (#32259)

This commit is contained in:
wangbo
2024-03-15 22:06:47 +08:00
committed by yiguolei
parent a15bf3057f
commit 83ab61ad22
9 changed files with 123 additions and 64 deletions

View File

@ -460,11 +460,14 @@ public class SchemaTable extends Table {
.build()))
.put("active_queries", new SchemaTable(SystemIdGenerator.getNextId(), "active_queries", TableType.SCHEMA,
builder().column("QUERY_ID", ScalarType.createVarchar(256))
.column("START_TIME", ScalarType.createVarchar(256))
.column("QUERY_START_TIME", ScalarType.createVarchar(256))
.column("QUERY_TIME_MS", ScalarType.createType(PrimitiveType.BIGINT))
.column("WORKLOAD_GROUP_ID", ScalarType.createType(PrimitiveType.BIGINT))
.column("DATABASE", ScalarType.createVarchar(256))
.column("FRONTEND_INSTANCE", ScalarType.createVarchar(256))
.column("QUEUE_START_TIME", ScalarType.createVarchar(256))
.column("QUEUE_END_TIME", ScalarType.createVarchar(256))
.column("QUERY_STATUS", ScalarType.createVarchar(256))
.column("SQL", ScalarType.createStringType())
.build()))
.put("workload_groups", new SchemaTable(SystemIdGenerator.getNextId(), "workload_groups", TableType.SCHEMA,

View File

@ -3904,6 +3904,10 @@ public class Coordinator implements CoordInterface {
}
}
public QueueToken getQueueToken() {
return queueToken;
}
// fragment instance exec param, it is used to assemble
// the per-instance TPlanFragmentExecParams, as a member of
// FragmentExecParams

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
@ -265,7 +266,6 @@ public final class QeProcessorImpl implements QeProcessor {
private final ConnectContext connectContext;
private final Coordinator coord;
private final String sql;
private final long startExecTime;
// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
@ -277,7 +277,6 @@ public final class QeProcessorImpl implements QeProcessor {
this.connectContext = connectContext;
this.coord = coord;
this.sql = sql;
this.startExecTime = System.currentTimeMillis();
}
public ConnectContext getConnectContext() {
@ -293,7 +292,31 @@ public final class QeProcessorImpl implements QeProcessor {
}
public long getStartExecTime() {
return startExecTime;
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueEndTime();
}
return -1;
}
public long getQueueStartTime() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueStartTime();
}
return -1;
}
public long getQueueEndTime() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueEndTime();
}
return -1;
}
public TokenState getQueueStatus() {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getTokenState();
}
return null;
}
}

View File

@ -74,7 +74,11 @@ public final class QueryStatisticsItem {
public String getQueryExecTime() {
final long currentTime = System.currentTimeMillis();
return String.valueOf(currentTime - queryStartTime);
if (queryStartTime <= 0) {
return String.valueOf(-1);
} else {
return String.valueOf(currentTime - queryStartTime);
}
}
public String getQueryId() {

View File

@ -100,7 +100,6 @@ public class QueryQueue {
}
public QueueToken getToken() throws UserException {
queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
@ -108,13 +107,16 @@ public class QueryQueue {
}
if (currentRunningQueryNum < maxConcurrency) {
currentRunningQueryNum++;
return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
retToken.setQueueTimeWhenOfferSuccess();
return retToken;
}
if (priorityTokenQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
}
QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
"query wait timeout " + queueTimeout + " ms");
newQueryToken.setQueueTimeWhenQueueSuccess();
this.priorityTokenQueue.offer(newQueryToken);
return newQueryToken;
} finally {

View File

@ -36,7 +36,7 @@ public class QueueToken implements Comparable<QueueToken> {
return Long.compare(this.tokenId, other.getTokenId());
}
enum TokenState {
public enum TokenState {
ENQUEUE_SUCCESS,
READY_TO_RUN
}
@ -56,6 +56,9 @@ public class QueueToken implements Comparable<QueueToken> {
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tokenCond = tokenLock.newCondition();
private long queueStartTime = -1;
private long queueEndTime = -1;
public QueueToken(TokenState tokenState, long queueWaitTimeout,
String offerResultDetail) {
this.tokenId = tokenIdGenerator.addAndGet(1);
@ -94,6 +97,7 @@ public class QueueToken implements Comparable<QueueToken> {
return false;
} finally {
this.tokenLock.unlock();
this.setQueueTimeWhenQueueEnd();
}
}
@ -126,6 +130,33 @@ public class QueueToken implements Comparable<QueueToken> {
return this.tokenState == TokenState.READY_TO_RUN;
}
public void setQueueTimeWhenOfferSuccess() {
long currentTime = System.currentTimeMillis();
this.queueStartTime = currentTime;
this.queueEndTime = currentTime;
}
public void setQueueTimeWhenQueueSuccess() {
long currentTime = System.currentTimeMillis();
this.queueStartTime = currentTime;
}
public void setQueueTimeWhenQueueEnd() {
this.queueEndTime = System.currentTimeMillis();
}
public long getQueueStartTime() {
return queueStartTime;
}
public long getQueueEndTime() {
return queueEndTime;
}
public TokenState getTokenState() {
return tokenState;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {

View File

@ -42,6 +42,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
@ -57,7 +58,6 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TSchemaTableRequestParams;
import org.apache.doris.thrift.TStatus;
@ -91,11 +91,14 @@ public class MetadataGenerator {
private static final ImmutableList<Column> ACTIVE_QUERIES_SCHEMA = ImmutableList.of(
new Column("QUERY_ID", ScalarType.createStringType()),
new Column("START_TIME", ScalarType.createStringType()),
new Column("QUERY_START_TIME", ScalarType.createStringType()),
new Column("QUERY_TIME_MS", PrimitiveType.BIGINT),
new Column("WORKLOAD_GROUP_ID", PrimitiveType.BIGINT),
new Column("DATABASE", ScalarType.createStringType()),
new Column("FRONTEND_INSTANCE", ScalarType.createStringType()),
new Column("QUEUE_START_TIME", ScalarType.createStringType()),
new Column("QUEUE_END_TIME", ScalarType.createStringType()),
new Column("QUERY_STATUS", ScalarType.createStringType()),
new Column("SQL", ScalarType.createStringType()));
private static final ImmutableMap<String, Integer> ACTIVE_QUERIES_COLUMN_TO_INDEX;
@ -490,53 +493,6 @@ public class MetadataGenerator {
return result;
}
private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId, Backend be,
String selfNode, QueryInfo queryInfo, TQueryStatistics qs) {
TRow trow = new TRow();
if (be != null) {
trow.addToColumnValue(new TCell().setStringVal(be.getHost()));
trow.addToColumnValue(new TCell().setLongVal(be.getBePort()));
} else {
trow.addToColumnValue(new TCell().setStringVal("invalid host"));
trow.addToColumnValue(new TCell().setLongVal(-1));
}
trow.addToColumnValue(new TCell().setStringVal(queryId));
String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
trow.addToColumnValue(new TCell().setStringVal(strDate));
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
if (qs != null) {
trow.addToColumnValue(new TCell().setLongVal(qs.workload_group_id));
trow.addToColumnValue(new TCell().setLongVal(qs.cpu_ms));
trow.addToColumnValue(new TCell().setLongVal(qs.scan_rows));
trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_rows));
} else {
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
}
if (queryInfo.getConnectContext() != null) {
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
} else {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
return trow;
}
private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableRequestParams tSchemaTableParams,
TFetchSchemaTableDataRequest parentRequest) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
@ -557,9 +513,15 @@ public class MetadataGenerator {
TRow trow = new TRow();
trow.addToColumnValue(new TCell().setStringVal(queryId));
String strDate = sdf.format(new Date(queryInfo.getStartExecTime()));
trow.addToColumnValue(new TCell().setStringVal(strDate));
trow.addToColumnValue(new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
long queryStartTime = queryInfo.getStartExecTime();
if (queryStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queryStartTime))));
trow.addToColumnValue(
new TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
} else {
trow.addToColumnValue(new TCell());
trow.addToColumnValue(new TCell().setLongVal(-1));
}
List<TPipelineWorkloadGroup> tgroupList = queryInfo.getCoord().gettWorkloadGroups();
if (tgroupList != null && tgroupList.size() == 1) {
@ -574,6 +536,30 @@ public class MetadataGenerator {
trow.addToColumnValue(new TCell().setStringVal(""));
}
trow.addToColumnValue(new TCell().setStringVal(selfNode));
long queueStartTime = queryInfo.getQueueStartTime();
if (queueStartTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueStartTime))));
} else {
trow.addToColumnValue(new TCell());
}
long queueEndTime = queryInfo.getQueueEndTime();
if (queueEndTime > 0) {
trow.addToColumnValue(new TCell().setStringVal(sdf.format(new Date(queueEndTime))));
} else {
trow.addToColumnValue(new TCell());
}
TokenState tokenState = queryInfo.getQueueStatus();
if (tokenState == null) {
trow.addToColumnValue(new TCell());
} else if (tokenState == TokenState.READY_TO_RUN) {
trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
} else {
trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
}
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
dataBatch.add(trow);
}