[Feature](profile)Support report runtime workload statistics #29591
This commit is contained in:
@ -230,6 +230,7 @@ import org.apache.doris.qe.QueryCancelWorker;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
|
||||
import org.apache.doris.scheduler.manager.TransientTaskManager;
|
||||
@ -491,6 +492,9 @@ public class Env {
|
||||
private WorkloadGroupMgr workloadGroupMgr;
|
||||
|
||||
private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;
|
||||
|
||||
private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
|
||||
|
||||
private QueryStats queryStats;
|
||||
|
||||
private StatisticsCleaner statisticsCleaner;
|
||||
@ -739,6 +743,7 @@ public class Env {
|
||||
this.globalFunctionMgr = new GlobalFunctionMgr();
|
||||
this.workloadGroupMgr = new WorkloadGroupMgr();
|
||||
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
|
||||
this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
|
||||
this.queryStats = new QueryStats();
|
||||
this.loadManagerAdapter = new LoadManagerAdapter();
|
||||
this.hiveTransactionMgr = new HiveTransactionMgr();
|
||||
@ -835,6 +840,10 @@ public class Env {
|
||||
return workloadSchedPolicyMgr;
|
||||
}
|
||||
|
||||
public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() {
|
||||
return workloadRuntimeStatusMgr;
|
||||
}
|
||||
|
||||
// use this to get correct ClusterInfoService instance
|
||||
public static SystemInfoService getCurrentSystemInfo() {
|
||||
return getCurrentEnv().getClusterInfo();
|
||||
@ -1014,6 +1023,7 @@ public class Env {
|
||||
workloadGroupMgr.startUpdateThread();
|
||||
workloadSchedPolicyMgr.start();
|
||||
workloadActionPublisherThread.start();
|
||||
workloadRuntimeStatusMgr.start();
|
||||
}
|
||||
|
||||
// wait until FE is ready.
|
||||
|
||||
@ -100,6 +100,8 @@ public class AuditEvent {
|
||||
@AuditField(value = "FuzzyVariables")
|
||||
public String fuzzyVariables = "";
|
||||
|
||||
public long pushToAuditLogQueueTime;
|
||||
|
||||
public static class AuditEventBuilder {
|
||||
|
||||
private AuditEvent auditEvent = new AuditEvent();
|
||||
|
||||
@ -119,6 +119,6 @@ public class AuditLogHelper {
|
||||
}
|
||||
}
|
||||
}
|
||||
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
|
||||
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build());
|
||||
}
|
||||
}
|
||||
|
||||
@ -271,7 +271,8 @@ public abstract class ConnectProcessor {
|
||||
break;
|
||||
}
|
||||
}
|
||||
auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true);
|
||||
auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(),
|
||||
true);
|
||||
// execute failed, skip remaining stmts
|
||||
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
|
||||
break;
|
||||
|
||||
@ -201,6 +201,15 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
LOG.debug("params: {}", params);
|
||||
}
|
||||
final TReportExecStatusResult result = new TReportExecStatusResult();
|
||||
|
||||
if (params.isSetReportWorkloadRuntimeStatus()) {
|
||||
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().updateBeQueryStats(params.report_workload_runtime_status);
|
||||
if (!params.isSetQueryId()) {
|
||||
result.setStatus(new TStatus(TStatusCode.OK));
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
final QueryInfo info = coordinatorMap.get(params.query_id);
|
||||
|
||||
if (info == null) {
|
||||
|
||||
@ -0,0 +1,223 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.resource.workloadschedpolicy;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.plugin.AuditEvent;
|
||||
import org.apache.doris.thrift.TQueryStatistics;
|
||||
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class WorkloadRuntimeStatusMgr {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
|
||||
private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap = Maps.newConcurrentMap();
|
||||
private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
|
||||
private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
|
||||
private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock();
|
||||
private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
|
||||
|
||||
class WorkloadRuntimeStatsThread extends Daemon {
|
||||
|
||||
WorkloadRuntimeStatusMgr workloadStatsMgr;
|
||||
|
||||
public WorkloadRuntimeStatsThread(WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr, String threadName,
|
||||
int interval) {
|
||||
super(threadName, interval);
|
||||
this.workloadStatsMgr = workloadRuntimeStatusMgr;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runOneCycle() {
|
||||
// 1 merge be query statistics
|
||||
Map<String, TQueryStatistics> queryStatisticsMap = workloadStatsMgr.getQueryStatisticsMap();
|
||||
|
||||
// 2 log query audit
|
||||
List<AuditEvent> auditEventList = workloadStatsMgr.getQueryNeedAudit();
|
||||
for (AuditEvent auditEvent : auditEventList) {
|
||||
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
|
||||
if (queryStats != null) {
|
||||
auditEvent.scanRows = queryStats.scan_rows;
|
||||
auditEvent.scanBytes = queryStats.scan_bytes;
|
||||
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
|
||||
}
|
||||
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
|
||||
}
|
||||
|
||||
// 3 clear beToQueryStatsMap when be report timeout
|
||||
workloadStatsMgr.clearReportTimeoutBeStatistics();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private Daemon thread = null;
|
||||
|
||||
public void submitFinishQueryToAudit(AuditEvent event) {
|
||||
queryAuditEventLogWriteLock();
|
||||
try {
|
||||
event.pushToAuditLogQueueTime = System.currentTimeMillis();
|
||||
queryAuditEventList.add(event);
|
||||
} finally {
|
||||
queryAuditEventLogWriteUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<AuditEvent> getQueryNeedAudit() {
|
||||
List<AuditEvent> ret = new ArrayList<>();
|
||||
long currentTime = System.currentTimeMillis();
|
||||
queryAuditEventLogWriteLock();
|
||||
try {
|
||||
int queryAuditLogTimeout = Config.query_audit_log_timeout_ms;
|
||||
Iterator<AuditEvent> iter = queryAuditEventList.iterator();
|
||||
while (iter.hasNext()) {
|
||||
AuditEvent ae = iter.next();
|
||||
if (currentTime - ae.pushToAuditLogQueueTime > queryAuditLogTimeout) {
|
||||
ret.add(ae);
|
||||
iter.remove();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
queryAuditEventLogWriteUnlock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
thread = new WorkloadRuntimeStatsThread(this, "workload-runtime-stats-thread",
|
||||
Config.workload_runtime_status_thread_interval_ms);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
|
||||
if (!params.isSetBackendId()) {
|
||||
LOG.warn("be report workload runtime status but without beid");
|
||||
return;
|
||||
}
|
||||
if (!params.isSetQueryStatisticsMap()) {
|
||||
LOG.warn("be report workload runtime status but without query stats map");
|
||||
return;
|
||||
}
|
||||
long beId = params.backend_id;
|
||||
Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
|
||||
beLastReportTime.put(beId, System.currentTimeMillis());
|
||||
if (queryIdMap == null) {
|
||||
queryIdMap = Maps.newConcurrentMap();
|
||||
queryIdMap.putAll(params.query_statistics_map);
|
||||
beToQueryStatsMap.put(beId, queryIdMap);
|
||||
} else {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) {
|
||||
queryIdMap.put(entry.getKey(), entry.getValue());
|
||||
queryLastReportTime.put(entry.getKey(), currentTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, TQueryStatistics> getQueryStatisticsMap() {
|
||||
// 1 merge query stats in all be
|
||||
Set<Long> beIdSet = beToQueryStatsMap.keySet();
|
||||
Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
|
||||
for (Long beId : beIdSet) {
|
||||
Map<String, TQueryStatistics> currentQueryMap = beToQueryStatsMap.get(beId);
|
||||
Set<String> queryIdSet = currentQueryMap.keySet();
|
||||
for (String queryId : queryIdSet) {
|
||||
TQueryStatistics retQuery = retQueryMap.get(queryId);
|
||||
if (retQuery == null) {
|
||||
retQuery = new TQueryStatistics();
|
||||
retQueryMap.put(queryId, retQuery);
|
||||
}
|
||||
|
||||
TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
|
||||
mergeQueryStatistics(retQuery, curQueryStats);
|
||||
}
|
||||
}
|
||||
|
||||
return retQueryMap;
|
||||
}
|
||||
|
||||
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
|
||||
dst.scan_rows += src.scan_rows;
|
||||
dst.scan_bytes += src.scan_bytes;
|
||||
dst.cpu_ms += src.cpu_ms;
|
||||
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
|
||||
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
void clearReportTimeoutBeStatistics() {
|
||||
// 1 clear report timeout be
|
||||
Set<Long> beNeedToRemove = new HashSet<>();
|
||||
Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
|
||||
Long currentTime = System.currentTimeMillis();
|
||||
for (Long beId : currentBeIdSet) {
|
||||
Long lastReportTime = beLastReportTime.get(beId);
|
||||
if (lastReportTime != null
|
||||
&& currentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) {
|
||||
beNeedToRemove.add(beId);
|
||||
}
|
||||
}
|
||||
for (Long beId : beNeedToRemove) {
|
||||
beToQueryStatsMap.remove(beId);
|
||||
beLastReportTime.remove(beId);
|
||||
}
|
||||
|
||||
// 2 clear report timeout query
|
||||
Set<String> queryNeedToClear = new HashSet<>();
|
||||
Long newCurrentTime = System.currentTimeMillis();
|
||||
Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
|
||||
for (String queryId : queryLastReportTimeKeySet) {
|
||||
Long lastReportTime = queryLastReportTime.get(queryId);
|
||||
if (lastReportTime != null
|
||||
&& newCurrentTime - lastReportTime > Config.be_report_query_statistics_timeout_ms) {
|
||||
queryNeedToClear.add(queryId);
|
||||
}
|
||||
}
|
||||
|
||||
Set<Long> beIdSet = beToQueryStatsMap.keySet();
|
||||
for (String queryId : queryNeedToClear) {
|
||||
for (Long beId : beIdSet) {
|
||||
beToQueryStatsMap.get(beId).remove(queryId);
|
||||
}
|
||||
queryLastReportTime.remove(queryId);
|
||||
}
|
||||
}
|
||||
|
||||
private void queryAuditEventLogWriteLock() {
|
||||
queryAuditEventLock.writeLock().lock();
|
||||
}
|
||||
|
||||
private void queryAuditEventLogWriteUnlock() {
|
||||
queryAuditEventLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user