[Improvement](auditlog) add column catalog for audit log and audit log table (#26403)
This commit is contained in:
@ -87,6 +87,7 @@ create table doris_audit_db__.doris_audit_log_tbl__
|
||||
`time` datetime not null comment "Query start time",
|
||||
client_ip varchar(32) comment "Client IP",
|
||||
user varchar(64) comment "User name",
|
||||
catalog varchar(128) comment "Catalog of this query",
|
||||
db varchar(96) comment "Database of this query",
|
||||
state varchar(8) comment "Query result state. EOF, ERR, OK",
|
||||
error_code int comment "Error code of failing query.",
|
||||
@ -123,6 +124,7 @@ create table doris_audit_db__.doris_slow_log_tbl__
|
||||
`time` datetime not null comment "Query start time",
|
||||
client_ip varchar(32) comment "Client IP",
|
||||
user varchar(64) comment "User name",
|
||||
catalog varchar(128) comment "Catalog of this query",
|
||||
db varchar(96) comment "Database of this query",
|
||||
state varchar(8) comment "Query result state. EOF, ERR, OK",
|
||||
error_code int comment "Error code of failing query.",
|
||||
|
||||
@ -93,6 +93,7 @@ create table doris_audit_db__.doris_audit_log_tbl__
|
||||
`time` datetime not null comment "Query start time",
|
||||
client_ip varchar(32) comment "Client IP",
|
||||
user varchar(64) comment "User name",
|
||||
catalog varchar(128) comment "Catalog of this query",
|
||||
db varchar(96) comment "Database of this query",
|
||||
state varchar(8) comment "Query result state. EOF, ERR, OK",
|
||||
error_code int comment "Error code of failing query.",
|
||||
@ -129,6 +130,7 @@ create table doris_audit_db__.doris_slow_log_tbl__
|
||||
`time` datetime not null comment "Query start time",
|
||||
client_ip varchar(32) comment "Client IP",
|
||||
user varchar(64) comment "User name",
|
||||
catalog varchar(128) comment "Catalog of this query",
|
||||
db varchar(96) comment "Database of this query",
|
||||
state varchar(8) comment "Query result state. EOF, ERR, OK",
|
||||
error_code int comment "Error code of failing query.",
|
||||
|
||||
@ -56,6 +56,8 @@ public class AuditEvent {
|
||||
public String clientIp = "";
|
||||
@AuditField(value = "User")
|
||||
public String user = "";
|
||||
@AuditField(value = "Catalog")
|
||||
public String catalog = "";
|
||||
@AuditField(value = "Db")
|
||||
public String db = "";
|
||||
@AuditField(value = "State")
|
||||
@ -131,6 +133,11 @@ public class AuditEvent {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AuditEventBuilder setCatalog(String catalog) {
|
||||
auditEvent.catalog = catalog;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AuditEventBuilder setDb(String db) {
|
||||
auditEvent.db = db;
|
||||
return this;
|
||||
|
||||
@ -62,6 +62,11 @@ public class AuditLogHelper {
|
||||
.setWorkloadGroup(ctx.getWorkloadGroupName())
|
||||
.setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables());
|
||||
|
||||
// when doric fe is booting, current catalog may not be set
|
||||
if (ctx.getCurrentCatalog() != null) {
|
||||
ctx.getAuditEventBuilder().setCatalog(ctx.getCurrentCatalog().getName());
|
||||
}
|
||||
|
||||
if (ctx.getState().isQuery()) {
|
||||
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
|
||||
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
|
||||
|
||||
@ -163,6 +163,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
|
||||
logBuffer.append(longToTimeString(event.timestamp)).append("\t");
|
||||
logBuffer.append(event.clientIp).append("\t");
|
||||
logBuffer.append(event.user).append("\t");
|
||||
logBuffer.append(event.catalog).append("\t");
|
||||
logBuffer.append(event.db).append("\t");
|
||||
logBuffer.append(event.state).append("\t");
|
||||
logBuffer.append(event.errorCode).append("\t");
|
||||
|
||||
@ -71,7 +71,7 @@ public class DorisStreamLoader {
|
||||
|
||||
conn.addRequestProperty("label", label);
|
||||
conn.addRequestProperty("max_filter_ratio", "1.0");
|
||||
conn.addRequestProperty("columns", "query_id, `time`, client_ip, user, db, state, error_code, error_message, " +
|
||||
conn.addRequestProperty("columns", "query_id, `time`, client_ip, user, catalog, db, state, error_code, error_message, " +
|
||||
"query_time, scan_bytes, scan_rows, return_rows, stmt_id, is_query, frontend_ip, cpu_time_ms, sql_hash, " +
|
||||
"sql_digest, peak_memory_bytes, stmt");
|
||||
|
||||
@ -88,7 +88,7 @@ public class DorisStreamLoader {
|
||||
sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n ");
|
||||
sb.append("-H \"").append("Content-Type\":").append("\"text/plain; charset=UTF-8\" \\\n ");
|
||||
sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n ");
|
||||
sb.append("-H \"").append("columns\":").append("\"query_id, time, client_ip, user, db, state, error_code, " +
|
||||
sb.append("-H \"").append("columns\":").append("\"query_id, time, client_ip, user, catalog, db, state, error_code, " +
|
||||
"error_message, query_time, scan_bytes, scan_rows, return_rows, stmt_id, is_query, frontend_ip, " +
|
||||
"cpu_time_ms, sql_hash, sql_digest, peak_memory_bytes, stmt\" \\\n ");
|
||||
sb.append("\"").append(conn.getURL()).append("\"");
|
||||
|
||||
@ -181,6 +181,7 @@ def add_auditload_plugin():
|
||||
\`time\` datetime not null comment 'Query start time', \
|
||||
client_ip varchar(32) comment 'Client IP', \
|
||||
user varchar(64) comment 'User name', \
|
||||
catalog varchar(128) comment 'Catalog of this query', \
|
||||
db varchar(96) comment 'Database of this query', \
|
||||
state varchar(8) comment 'Query result state. EOF, ERR, OK', \
|
||||
query_time bigint comment 'Query execution time in millisecond', \
|
||||
|
||||
Reference in New Issue
Block a user