Files
tidb/pkg/executor/adapter_slow_log.go

275 lines
9.1 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"bytes"
"context"
"strings"
"sync"
"time"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/slowlogrule"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
func init() {
variable.SlowLogRuleFieldAccessors[strings.ToLower(variable.SlowLogPlanDigest)] = variable.SlowLogFieldAccessor{
Parse: variable.ParseString,
Setter: func(_ context.Context, seVars *variable.SessionVars, items *variable.SlowQueryLogItems) {
_, planDigest := GetPlanDigest(seVars.StmtCtx)
items.PlanDigest = planDigest.String()
},
Match: func(_ *variable.SessionVars, items *variable.SlowQueryLogItems, threshold any) bool {
return variable.MatchEqual(threshold, strings.ToLower(items.PlanDigest))
},
}
}
var sampleLoggerFactory = logutil.SampleLoggerFactory(time.Minute, 1, zap.String(logutil.LogFieldCategory, "slow log"))
func mergeConditionFields(dst, src map[string]struct{}) {
for k := range src {
dst[k] = struct{}{}
}
}
func updateAllRuleFields(globalRules *slowlogrule.GlobalSlowLogRules, seVars *variable.SessionVars) {
if seVars.SlowLogRules.NeedUpdateEffectiveFields ||
globalRules.RawRulesHash != seVars.SlowLogRules.GlobalRawRulesHash {
allRuleFields := make(map[string]struct{})
if seVars.SlowLogRules.SlowLogRules != nil {
mergeConditionFields(allRuleFields, seVars.SlowLogRules.Fields)
}
if specificSessionRules, ok := globalRules.RulesMap[int64(seVars.ConnectionID)]; ok {
mergeConditionFields(allRuleFields, specificSessionRules.Fields)
}
if clusterRules, ok := globalRules.RulesMap[variable.UnsetConnID]; ok {
mergeConditionFields(allRuleFields, clusterRules.Fields)
}
seVars.SlowLogRules.EffectiveFields = allRuleFields
seVars.SlowLogRules.GlobalRawRulesHash = globalRules.RawRulesHash
seVars.SlowLogRules.NeedUpdateEffectiveFields = false
}
}
var slowQueryLogItemsPool = sync.Pool{
New: func() any { return &variable.SlowQueryLogItems{} },
}
func getSlowLogItems() *variable.SlowQueryLogItems {
return slowQueryLogItemsPool.Get().(*variable.SlowQueryLogItems)
}
func putSlowLogItems(items *variable.SlowQueryLogItems) {
if items == nil {
return
}
*items = variable.SlowQueryLogItems{}
slowQueryLogItemsPool.Put(items)
}
// PrepareSlowLogItemsForRules builds a SlowQueryLogItems containing only the fields referenced by the effective slow log rules.
// These pre-collected fields are later used for matching SQL execution details against the rules.
func PrepareSlowLogItemsForRules(ctx context.Context, globalRules *slowlogrule.GlobalSlowLogRules, seVars *variable.SessionVars) *variable.SlowQueryLogItems {
updateAllRuleFields(globalRules, seVars)
if len(seVars.SlowLogRules.EffectiveFields) == 0 {
return nil
}
var items *variable.SlowQueryLogItems
for field := range seVars.SlowLogRules.EffectiveFields {
if gs, ok := variable.SlowLogRuleFieldAccessors[field]; ok && gs.Setter != nil {
if items == nil {
items = getSlowLogItems()
}
gs.Setter(ctx, seVars, items)
}
}
return items
}
// Match checks whether the given SlowQueryLogItems satisfies any of the
// current session's slow log trigger rules.
//
// Matching is evaluated in two levels of logical relationship:
// - Rules are combined with OR: if any rule is satisfied, the function returns true.
// - Conditions inside a rule are combined with AND: all conditions must be met for that rule.
//
// Returns true if any rule matches, false otherwise.
func Match(seVars *variable.SessionVars, items *variable.SlowQueryLogItems, rules *slowlogrule.SlowLogRules) bool {
if rules == nil {
return false
}
// Or logical relationship
for _, rule := range rules.Rules {
match := true
// And logical relationship
for _, condition := range rule.Conditions {
accessor := variable.SlowLogRuleFieldAccessors[strings.ToLower(condition.Field)]
if ok := accessor.Match(seVars, items, condition.Threshold); !ok {
match = false
break
}
}
if match {
return true
}
}
return false
}
// ShouldWriteSlowLog determines whether the current query should be written to the slow log.
// The decision is based on the following rule hierarchy:
//
// 1. Session-level rules: if defined, they take precedence.
// 2. Global-level rules:
// - If a rule specifies a ConnID that matches the current session, it is applied.
// - If a rule does not specify ConnID (global default), it is also applied.
//
// 3. All applicable rules are combined with logical OR.
//
// Returns true if any rule matches, otherwise false.
func ShouldWriteSlowLog(globalRules *slowlogrule.GlobalSlowLogRules, seVars *variable.SessionVars, items *variable.SlowQueryLogItems) bool {
if isMatched := Match(seVars, items, seVars.SlowLogRules.SlowLogRules); isMatched {
return isMatched
}
if specificSessionRules, ok := globalRules.RulesMap[int64(seVars.ConnectionID)]; ok {
if isMatched := Match(seVars, items, specificSessionRules); isMatched {
return isMatched
}
}
if clusterRules, ok := globalRules.RulesMap[variable.UnsetConnID]; ok {
if isMatched := Match(seVars, items, clusterRules); isMatched {
return isMatched
}
}
return false
}
// CompleteSlowLogItemsForRules fills in the remaining fields of SlowQueryLogItems
// that were not required by the session's effective rule set.
// It's exporting for testing.
func CompleteSlowLogItemsForRules(ctx context.Context, seVars *variable.SessionVars, items *variable.SlowQueryLogItems) {
for field, sg := range variable.SlowLogRuleFieldAccessors {
if _, ok := seVars.SlowLogRules.EffectiveFields[field]; ok {
continue
}
if sg.Setter != nil {
sg.Setter(ctx, seVars, items)
}
}
}
// SetSlowLogItems fills the remaining fields of SlowQueryLogItems after SQL execution.
func SetSlowLogItems(a *ExecStmt, txnTS uint64, hasMoreResults bool, items *variable.SlowQueryLogItems) {
if items == nil {
return
}
CompleteSlowLogItemsForRules(a.GoCtx, a.Ctx.GetSessionVars(), items)
sessVars := a.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
var indexNames string
if len(stmtCtx.IndexNames) > 0 {
// remove duplicate index.
idxMap := make(map[string]struct{})
buf := bytes.NewBuffer(make([]byte, 0, 4))
buf.WriteByte('[')
for _, idx := range stmtCtx.IndexNames {
_, ok := idxMap[idx]
if ok {
continue
}
idxMap[idx] = struct{}{}
if buf.Len() > 1 {
buf.WriteByte(',')
}
buf.WriteString(idx)
}
buf.WriteByte(']')
indexNames = buf.String()
}
var ruDetails *util.RUDetails
if ruDetailsVal := a.GoCtx.Value(util.RUDetailsCtxKey); ruDetailsVal != nil {
ruDetails = ruDetailsVal.(*util.RUDetails)
} else {
ruDetails = util.NewRUDetails()
}
binaryPlan := ""
if variable.GenerateBinaryPlan.Load() {
binaryPlan = getBinaryPlan(a.Ctx)
if len(binaryPlan) > 0 {
binaryPlan = variable.SlowLogBinaryPlanPrefix + binaryPlan + variable.SlowLogPlanSuffix
}
}
var keyspaceID uint32
keyspaceName := keyspace.GetKeyspaceNameBySettings()
if !keyspace.IsKeyspaceNameEmpty(keyspaceName) {
keyspaceID = uint32(a.Ctx.GetStore().GetCodec().GetKeyspaceID())
}
if txnTS == 0 {
// TODO: txnTS maybe ambiguous, consider logging stale-read-ts with a new field in the slow log.
txnTS = sessVars.TxnCtx.StaleReadTs
}
items.TxnTS = txnTS
items.KeyspaceName = keyspaceName
items.KeyspaceID = keyspaceID
items.SQL = FormatSQL(a.GetTextToLog(true)).String()
items.IndexNames = indexNames
items.Plan = getPlanTree(stmtCtx)
items.BinaryPlan = binaryPlan
items.Prepared = a.isPreparedStmt
items.HasMoreResults = hasMoreResults
items.PlanFromCache = sessVars.FoundInPlanCache
items.PlanFromBinding = sessVars.FoundInBinding
items.ResultRows = stmtCtx.GetResultRowsCount()
items.IsExplicitTxn = sessVars.TxnCtx.IsExplicit
items.IsWriteCacheTable = stmtCtx.WaitLockLeaseTime > 0
items.UsedStats = stmtCtx.GetUsedStatsInfo(false)
items.IsSyncStatsFailed = stmtCtx.IsSyncStatsFailed
items.Warnings = variable.CollectWarningsForSlowLog(stmtCtx)
items.ResourceGroupName = stmtCtx.ResourceGroupName
items.RUDetails = ruDetails
items.CPUUsages = sessVars.SQLCPUUsages.GetCPUUsages()
items.StorageKV = stmtCtx.IsTiKV.Load()
items.StorageMPP = stmtCtx.IsTiFlash.Load()
items.MemArbitration = stmtCtx.MemTracker.MemArbitration().Seconds()
if a.retryCount > 0 {
items.ExecRetryTime = items.TimeTotal - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
}
if _, ok := a.StmtNode.(*ast.CommitStmt); ok && sessVars.PrevStmt != nil {
items.PrevStmt = sessVars.PrevStmt.String()
}
}