[enhancement](stats) audit for stats collection #24074

log stas collection sqls in audit log
This commit is contained in:
AKIRA
2023-09-11 09:26:12 +09:00
committed by GitHub
parent 71db844c64
commit 31bffdb5fc
5 changed files with 147 additions and 92 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlBlockUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -224,6 +225,10 @@ public class SqlBlockRuleMgr implements Writable {
* Match SQL according to rules.
**/
public void matchSql(String originSql, String sqlHash, String user) throws AnalysisException {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().internalSession) {
return;
}
// match global rule
List<SqlBlockRule> globalRules =
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());
@ -260,6 +265,9 @@ public class SqlBlockRuleMgr implements Writable {
**/
public void checkLimitations(Long partitionNum, Long tabletNum, Long cardinality, String user)
throws AnalysisException {
if (ConnectContext.get().getSessionVariable().internalSession) {
return;
}
// match global rule
List<SqlBlockRule> globalRules =
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.FrontendOptions;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import org.apache.commons.codec.digest.DigestUtils;
public class AuditLogHelper {
public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt,
org.apache.doris.proto.Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
origStmt = origStmt.replace("\n", " ");
// slow query
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();
SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext();
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
.setQueryTime(elapseMs)
.setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()))
.setTraceId(spanContext.isValid() ? spanContext.getTraceId() : "")
.setWorkloadGroup(ctx.getWorkloadGroupName())
.setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables());
if (ctx.getState().isQuery()) {
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
if (ctx.getState().getStateType() == MysqlStateType.ERR
&& ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) {
// err query
MetricRepo.COUNTER_QUERY_ERR.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
} else if (ctx.getState().getStateType() == MysqlStateType.OK
|| ctx.getState().getStateType() == MysqlStateType.EOF) {
// ok query
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
if (elapseMs > Config.qe_slow_log_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
}
}
ctx.getAuditEventBuilder().setIsQuery(true);
if (ctx.getQueryDetail() != null) {
ctx.getQueryDetail().setEventTime(endTime);
ctx.getQueryDetail().setEndTime(endTime);
ctx.getQueryDetail().setLatency(elapseMs);
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
ctx.setQueryDetail(null);
}
} else {
ctx.getAuditEventBuilder().setIsQuery(false);
}
ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) {
ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
} else {
if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager()
&& ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
// INSERT INTO VALUES may be very long, so we only log at most 1K bytes.
int length = Math.min(1024, origStmt.length());
ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length));
} else {
ctx.getAuditEventBuilder().setStmt(origStmt);
}
}
if (!Env.getCurrentEnv().isMaster()) {
if (ctx.executor.isForwardToMaster()) {
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
}
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
}
}

View File

@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -56,17 +54,14 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.proto.Data;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@ -279,84 +274,7 @@ public class ConnectProcessor {
private void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
origStmt = origStmt.replace("\n", " ");
// slow query
long endTime = System.currentTimeMillis();
long elapseMs = endTime - ctx.getStartTime();
SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext();
ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
.setState(ctx.getState().toString())
.setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode())
.setErrorMessage((ctx.getState().getErrorMessage() == null ? "" :
ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " ")))
.setQueryTime(elapseMs)
.setScanBytes(statistics == null ? 0 : statistics.getScanBytes())
.setScanRows(statistics == null ? 0 : statistics.getScanRows())
.setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
.setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes())
.setReturnRows(ctx.getReturnRows())
.setStmtId(ctx.getStmtId())
.setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId()))
.setTraceId(spanContext.isValid() ? spanContext.getTraceId() : "")
.setWorkloadGroup(ctx.getWorkloadGroupName())
.setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables());
if (ctx.getState().isQuery()) {
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
if (ctx.getState().getStateType() == MysqlStateType.ERR
&& ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) {
// err query
MetricRepo.COUNTER_QUERY_ERR.increase(1L);
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
} else if (ctx.getState().getStateType() == MysqlStateType.OK
|| ctx.getState().getStateType() == MysqlStateType.EOF) {
// ok query
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
if (elapseMs > Config.qe_slow_log_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
}
}
ctx.getAuditEventBuilder().setIsQuery(true);
if (ctx.getQueryDetail() != null) {
ctx.getQueryDetail().setEventTime(endTime);
ctx.getQueryDetail().setEndTime(endTime);
ctx.getQueryDetail().setLatency(elapseMs);
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
ctx.setQueryDetail(null);
}
} else {
ctx.getAuditEventBuilder().setIsQuery(false);
}
ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
// We put origin query stmt at the end of audit log, for parsing the log more convenient.
if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) {
ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
} else {
if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager()
&& ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
// INSERT INTO VALUES may be very long, so we only log at most 1K bytes.
int length = Math.min(1024, origStmt.length());
ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length));
} else {
ctx.getAuditEventBuilder().setStmt(origStmt);
}
}
if (!Env.getCurrentEnv().isMaster()) {
if (ctx.executor.isForwardToMaster()) {
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
}
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables);
}
// Process COM_QUERY statement,

View File

@ -2622,6 +2622,8 @@ public class StmtExecutor {
fetchResultSpan.end();
}
} finally {
AuditLogHelper.logAuditLog(context, originStmt.toString(), parsedStmt, getQueryStatisticsForAuditLog(),
true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
@ -188,7 +189,7 @@ public abstract class BaseAnalysisTask {
protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
}
public void cancel() {
@ -228,8 +229,8 @@ public abstract class BaseAnalysisTask {
@Override
public String toString() {
return String.format("Job id [%d], Task id [%d], catalog [%s], db [%s], table [%s], column [%s]",
info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(),
col == null ? "TableRowCount" : col.getName());
info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(),
col == null ? "TableRowCount" : col.getName());
}
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
@ -237,12 +238,18 @@ public abstract class BaseAnalysisTask {
return;
}
LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
stmtExecutor.execute();
QueryState queryState = stmtExecutor.getContext().getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, stmtExecutor.getOriginStmt().toString(),
queryState.getErrorMessage()));
try {
stmtExecutor.execute();
QueryState queryState = stmtExecutor.getContext().getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, stmtExecutor.getOriginStmt().toString(),
queryState.getErrorMessage()));
}
} finally {
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(),
true);
}
}
}