diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b5629d6217..5a811fddd6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2727,6 +2727,10 @@ public class Config extends ConfigBase { @ConfField public static String cloud_sql_server_cluster_id = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"; + //* audit_event_log_queue_size = qps * query_audit_log_timeout_ms + @ConfField(mutable = true) + public static int audit_event_log_queue_size = 250000; + @ConfField(description = {"Stream_Load 导入时,label 被限制的最大长度", "Stream_Load When importing, the maximum length of label is limited"}) public static int label_regex_length = 128; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java index ed0d028315..e2c45ae39a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditEventProcessor.java @@ -70,12 +70,21 @@ public class AuditEventProcessor { } } - public void handleAuditEvent(AuditEvent auditEvent) { + public boolean handleAuditEvent(AuditEvent auditEvent) { + return handleAuditEvent(auditEvent, false); + } + + public boolean handleAuditEvent(AuditEvent auditEvent, boolean ignoreQueueFullLog) { + boolean isAddSucc = true; try { eventQueue.add(auditEvent); } catch (Exception e) { - LOG.warn("encounter exception when handle audit event, ignore", e); + isAddSucc = false; + if (!ignoreQueueFullLog) { + LOG.warn("encounter exception when handle audit event {}, ignore", auditEvent.type, e); + } } + return isAddSucc; } public class Worker implements Runnable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java index cb4b8a5f2e..f3ad56a33c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java @@ -71,18 +71,33 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { Map queryStatisticsMap = getQueryStatisticsMap(); // 2 log query audit - List auditEventList = getQueryNeedAudit(); - for (AuditEvent auditEvent : auditEventList) { - TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); - if (queryStats != null) { - auditEvent.scanRows = queryStats.scan_rows; - auditEvent.scanBytes = queryStats.scan_bytes; - auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; - auditEvent.cpuTimeMs = queryStats.cpu_ms; - auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; - auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; + try { + int missedLogCount = 0; + int succLogCount = 0; + List auditEventList = getQueryNeedAudit(); + for (AuditEvent auditEvent : auditEventList) { + TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId); + if (queryStats != null) { + auditEvent.scanRows = queryStats.scan_rows; + auditEvent.scanBytes = queryStats.scan_bytes; + auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes; + auditEvent.cpuTimeMs = queryStats.cpu_ms; + auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes; + auditEvent.shuffleSendRows = queryStats.shuffle_send_rows; + } + boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true); + if (!ret) { + missedLogCount++; + } else { + succLogCount++; + } } - Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); + if (missedLogCount > 0) { + LOG.warn("discard audit event because of log queue is full, discard num : {}, succ num : {}", + missedLogCount, succLogCount); + } + } catch (Throwable t) { + LOG.warn("exception happens when handleAuditEvent, ", t); } // 3 clear beToQueryStatsMap when be report timeout @@ -92,6 +107,12 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon { public void submitFinishQueryToAudit(AuditEvent event) { queryAuditEventLogWriteLock(); try { + if (queryAuditEventList.size() >= Config.audit_event_log_queue_size) { + LOG.warn("audit log event queue size {} is full, this may cause audit log missed." + + "you can check whether qps is too high or reset audit_event_log_queue_size", + queryAuditEventList.size()); + return; + } event.pushToAuditLogQueueTime = System.currentTimeMillis(); queryAuditEventList.add(event); } finally {