[pick]Add audit log event queue size limit (#37914)
## Proposed changes pick #37786
This commit is contained in:
@ -70,12 +70,21 @@ public class AuditEventProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
public void handleAuditEvent(AuditEvent auditEvent) {
|
||||
public boolean handleAuditEvent(AuditEvent auditEvent) {
|
||||
return handleAuditEvent(auditEvent, false);
|
||||
}
|
||||
|
||||
public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) {
|
||||
boolean isAddSucc = true;
|
||||
try {
|
||||
eventQueue.add(auditEvent);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("encounter exception when handle audit event, ignore", e);
|
||||
isAddSucc = false;
|
||||
if (!ignoreQueueFullLog) {
|
||||
LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e);
|
||||
}
|
||||
}
|
||||
return isAddSucc;
|
||||
}
|
||||
|
||||
public class Worker implements Runnable {
|
||||
|
||||
@ -71,18 +71,33 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
|
||||
Map<String, TQueryStatistics> queryStatisticsMap = getQueryStatisticsMap();
|
||||
|
||||
// 2 log query audit
|
||||
List<AuditEvent> auditEventList = getQueryNeedAudit();
|
||||
for (AuditEvent auditEvent : auditEventList) {
|
||||
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
|
||||
if (queryStats != null) {
|
||||
auditEvent.scanRows = queryStats.scan_rows;
|
||||
auditEvent.scanBytes = queryStats.scan_bytes;
|
||||
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
|
||||
auditEvent.cpuTimeMs = queryStats.cpu_ms;
|
||||
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
|
||||
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
|
||||
try {
|
||||
int missedLogCount = 0;
|
||||
int succLogCount = 0;
|
||||
List<AuditEvent> auditEventList = getQueryNeedAudit();
|
||||
for (AuditEvent auditEvent : auditEventList) {
|
||||
TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
|
||||
if (queryStats != null) {
|
||||
auditEvent.scanRows = queryStats.scan_rows;
|
||||
auditEvent.scanBytes = queryStats.scan_bytes;
|
||||
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
|
||||
auditEvent.cpuTimeMs = queryStats.cpu_ms;
|
||||
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
|
||||
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
|
||||
}
|
||||
boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
|
||||
if (!ret) {
|
||||
missedLogCount++;
|
||||
} else {
|
||||
succLogCount++;
|
||||
}
|
||||
}
|
||||
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
|
||||
if (missedLogCount > 0) {
|
||||
LOG.warn("discard audit event because of log queue is full, discard num : {}, succ num : {}",
|
||||
missedLogCount, succLogCount);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("exception happens when handleAuditEvent, ", t);
|
||||
}
|
||||
|
||||
// 3 clear beToQueryStatsMap when be report timeout
|
||||
@ -92,6 +107,12 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
|
||||
public void submitFinishQueryToAudit(AuditEvent event) {
|
||||
queryAuditEventLogWriteLock();
|
||||
try {
|
||||
if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) {
|
||||
LOG.warn("audit log event queue size {} is full, this may cause audit log missed."
|
||||
+ "you can check whether qps is too high or reset audit_event_log_queue_size",
|
||||
queryAuditEventList.size());
|
||||
return;
|
||||
}
|
||||
event.pushToAuditLogQueueTime = System.currentTimeMillis();
|
||||
queryAuditEventList.add(event);
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user