Files
tidb/pkg/util/memoryusagealarm/memoryusagealarm.go
2025-10-30 13:13:49 +00:00

468 lines
16 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memoryusagealarm
import (
"cmp"
"fmt"
"os"
"path/filepath"
"runtime"
rpprof "runtime/pprof"
"slices"
"strings"
"sync/atomic"
"time"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/session/sessmgr"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/plancodec"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// ConfigProvider provides memory usage alarm configuration values
type ConfigProvider interface {
// GetMemoryUsageAlarmRatio returns the ratio of memory usage that triggers an alarm
GetMemoryUsageAlarmRatio() float64
// GetMemoryUsageAlarmKeepRecordNum returns the number of alarm records to keep
GetMemoryUsageAlarmKeepRecordNum() int64
// GetLogDir returns the directory for storing logs
GetLogDir() string
// GetComponentName returns the name of the component (e.g. "tidb-server" or "br")
GetComponentName() string
}
// TiDBConfigProvider implements ConfigProvider using TiDB's vardef variables
type TiDBConfigProvider struct{}
// GetMemoryUsageAlarmRatio returns the ratio of memory usage that triggers an alarm.
// When memory usage exceeds this ratio of the total memory limit (or system memory if no limit),
// the memory monitor will dump profiles and trigger OOM-related actions.
func (*TiDBConfigProvider) GetMemoryUsageAlarmRatio() float64 {
return vardef.MemoryUsageAlarmRatio.Load()
}
// GetMemoryUsageAlarmKeepRecordNum returns the number of alarm records to keep.
// When the number of records exceeds this limit, older records will be deleted.
func (*TiDBConfigProvider) GetMemoryUsageAlarmKeepRecordNum() int64 {
return vardef.MemoryUsageAlarmKeepRecordNum.Load()
}
// GetLogDir returns the directory for storing memory profiles and alarm records.
func (*TiDBConfigProvider) GetLogDir() string {
logDir, _ := filepath.Split(config.GetGlobalConfig().Log.File.Filename)
return logDir
}
// GetComponentName returns the name of the component for logging and metrics.
// This helps identify which component triggered the memory alarm.
func (*TiDBConfigProvider) GetComponentName() string {
return "tidb-server"
}
// Handle is the handler for memory usage alarm.
type Handle struct {
exitCh chan struct{}
sm atomic.Pointer[sessmgr.Manager]
configProvider ConfigProvider
}
// NewMemoryUsageAlarmHandle builds a memory usage alarm handler.
func NewMemoryUsageAlarmHandle(exitCh chan struct{}, provider ConfigProvider) *Handle {
return &Handle{
exitCh: exitCh,
configProvider: provider,
}
}
// SetSessionManager sets the Manager which is used to fetching the info
// of all active sessions.
func (eqh *Handle) SetSessionManager(sm sessmgr.Manager) *Handle {
eqh.sm.Store(&sm)
return eqh
}
// Run starts a memory usage alarm goroutine at the start time of the server.
func (eqh *Handle) Run() {
// use 100ms as tickInterval temply, may use given interval or use defined variable later
tickInterval := time.Millisecond * time.Duration(100)
ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
sm := eqh.sm.Load()
record := &memoryUsageAlarm{
configProvider: eqh.configProvider,
}
for {
select {
case <-ticker.C:
record.alarm4ExcessiveMemUsage(*sm)
case <-eqh.exitCh:
return
}
}
}
type memoryUsageAlarm struct {
lastCheckTime time.Time
lastUpdateVariableTime time.Time
err error
configProvider ConfigProvider
baseRecordDir string
lastRecordDirName []string
lastRecordMemUsed uint64
memoryUsageAlarmRatio float64
memoryUsageAlarmKeepRecordNum int64
serverMemoryLimit uint64
isServerMemoryLimitSet bool
initialized bool
}
func (record *memoryUsageAlarm) updateVariable() {
if time.Since(record.lastUpdateVariableTime) < 60*time.Second {
return
}
record.memoryUsageAlarmRatio = record.configProvider.GetMemoryUsageAlarmRatio()
record.memoryUsageAlarmKeepRecordNum = record.configProvider.GetMemoryUsageAlarmKeepRecordNum()
record.serverMemoryLimit = memory.ServerMemoryLimit.Load()
if record.serverMemoryLimit != 0 {
record.isServerMemoryLimitSet = true
} else {
record.serverMemoryLimit, record.err = memory.MemTotal()
if record.err != nil {
logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err))
return
}
record.isServerMemoryLimitSet = false
}
record.lastUpdateVariableTime = time.Now()
}
func (record *memoryUsageAlarm) initMemoryUsageAlarmRecord() {
record.lastCheckTime = time.Time{}
record.lastUpdateVariableTime = time.Time{}
record.updateVariable()
tidbLogDir := record.configProvider.GetLogDir()
record.baseRecordDir = filepath.Join(tidbLogDir, "oom_record")
if record.err = disk.CheckAndCreateDir(record.baseRecordDir); record.err != nil {
return
}
// Read last records
recordDirs, err := os.ReadDir(record.baseRecordDir)
if err != nil {
record.err = err
return
}
for _, dir := range recordDirs {
name := filepath.Join(record.baseRecordDir, dir.Name())
if strings.Contains(dir.Name(), "record") {
record.lastRecordDirName = append(record.lastRecordDirName, name)
}
}
record.initialized = true
}
// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk.
// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk.
func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm sessmgr.Manager) {
if memory.UsingGlobalMemArbitration() {
return
}
if !record.initialized {
record.initMemoryUsageAlarmRecord()
if record.err != nil {
return
}
} else {
record.updateVariable()
}
if record.memoryUsageAlarmRatio <= 0.0 || record.memoryUsageAlarmRatio >= 1.0 {
return
}
var memoryUsage uint64
instanceStats := memory.ReadMemStats()
if record.isServerMemoryLimitSet {
memoryUsage = instanceStats.HeapAlloc
} else {
memoryUsage, record.err = memory.MemUsed()
if record.err != nil {
logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err))
return
}
}
// TODO: Consider NextGC to record SQLs.
if needRecord, reason := record.needRecord(memoryUsage); needRecord {
record.lastCheckTime = time.Now()
record.lastRecordMemUsed = memoryUsage
record.doRecord(memoryUsage, instanceStats.HeapAlloc, sm, reason)
record.tryRemoveRedundantRecords()
}
}
// AlarmReason implements alarm reason.
type AlarmReason uint
const (
// GrowTooFast is the reason that memory increasing too fast.
GrowTooFast AlarmReason = iota
// ExceedAlarmRatio is the reason that memory used exceed threshold.
ExceedAlarmRatio
// NoReason means no alarm
NoReason
)
func (reason AlarmReason) String() string {
return [...]string{"memory usage grows too fast", "memory usage exceeds alarm ratio", "no reason"}[reason]
}
func (record *memoryUsageAlarm) needRecord(memoryUsage uint64) (bool, AlarmReason) {
// At least 60 seconds between two recordings that memory usage is less than threshold (default 70% system memory).
// If the memory is still exceeded, only records once.
// If the memory used ratio recorded this time is 0.1 higher than last time, we will force record this time.
if float64(memoryUsage) <= float64(record.serverMemoryLimit)*record.memoryUsageAlarmRatio {
return false, NoReason
}
interval := time.Since(record.lastCheckTime)
memDiff := int64(memoryUsage) - int64(record.lastRecordMemUsed)
if interval > 60*time.Second {
return true, ExceedAlarmRatio
}
if float64(memDiff) > 0.1*float64(record.serverMemoryLimit) {
return true, GrowTooFast
}
return false, NoReason
}
func (record *memoryUsageAlarm) doRecord(memUsage uint64, instanceMemoryUsage uint64, sm sessmgr.Manager, alarmReason AlarmReason) {
fields := make([]zap.Field, 0, 6)
componentName := record.configProvider.GetComponentName()
fields = append(fields, zap.Bool(fmt.Sprintf("is %s_memory_limit set", componentName), record.isServerMemoryLimitSet))
if record.isServerMemoryLimitSet {
fields = append(fields, zap.Uint64(fmt.Sprintf("%s_memory_limit", componentName), record.serverMemoryLimit))
fields = append(fields, zap.Uint64(fmt.Sprintf("%s memory usage", componentName), memUsage))
} else {
fields = append(fields, zap.Uint64("system memory total", record.serverMemoryLimit))
fields = append(fields, zap.Uint64("system memory usage", memUsage))
fields = append(fields, zap.Uint64(fmt.Sprintf("%s memory usage", componentName), instanceMemoryUsage))
}
fields = append(fields, zap.Float64("memory-usage-alarm-ratio", record.memoryUsageAlarmRatio))
fields = append(fields, zap.String("record path", record.baseRecordDir))
logutil.BgLogger().Warn(fmt.Sprintf("%s has the risk of OOM because of %s. Running profiles will be recorded in record path", componentName, alarmReason.String()), fields...)
recordDir := filepath.Join(record.baseRecordDir, "record"+record.lastCheckTime.Format(time.RFC3339))
if record.err = disk.CheckAndCreateDir(recordDir); record.err != nil {
return
}
record.lastRecordDirName = append(record.lastRecordDirName, recordDir)
if sm != nil {
if record.err = record.recordSQL(sm, recordDir); record.err != nil {
return
}
}
if record.err = record.recordProfile(recordDir); record.err != nil {
return
}
}
func (record *memoryUsageAlarm) tryRemoveRedundantRecords() {
filename := &record.lastRecordDirName
for len(*filename) > int(record.memoryUsageAlarmKeepRecordNum) {
err := os.RemoveAll((*filename)[0])
if err != nil {
logutil.BgLogger().Error("remove temp files failed", zap.Error(err))
}
*filename = (*filename)[1:]
}
}
func getPlanString(info *sessmgr.ProcessInfo) string {
var buf strings.Builder
rows, _ := plancodec.DecodeBinaryPlan4Connection(info.BriefBinaryPlan, types.ExplainFormatROW, true)
buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info"))
for _, row := range rows {
buf.WriteString("\n|")
for _, col := range row {
buf.WriteString(fmt.Sprintf("%v|", col))
}
}
return buf.String()
}
func (record *memoryUsageAlarm) printTop10SqlInfo(pinfo []*sessmgr.ProcessInfo, f *os.File) {
if _, err := f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n"); err != nil {
logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err))
}
memBuf := record.getTop10SqlInfoByMemoryUsage(pinfo)
if _, err := f.WriteString(memBuf.String()); err != nil {
logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err))
}
if _, err := f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n"); err != nil {
logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err))
}
costBuf := record.getTop10SqlInfoByCostTime(pinfo)
if _, err := f.WriteString(costBuf.String()); err != nil {
logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err))
}
}
func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *sessmgr.ProcessInfo) int, pinfo []*sessmgr.ProcessInfo) strings.Builder {
slices.SortFunc(pinfo, cmp)
list := pinfo
var buf strings.Builder
oomAction := vardef.OOMAction.Load()
serverMemoryLimit := memory.ServerMemoryLimit.Load()
for i, totalCnt := 0, 10; i < len(list) && totalCnt > 0; i++ {
info := list[i]
buf.WriteString(fmt.Sprintf("SQL %v: \n", i))
fields := util.GenLogFields(record.lastCheckTime.Sub(info.Time), info, false)
if fields == nil {
continue
}
fields = append(fields, zap.String("tidb_mem_oom_action", oomAction))
fields = append(fields, zap.Uint64("tidb_server_memory_limit", serverMemoryLimit))
fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery))
fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion))
fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction))
fields = append(fields, zap.String("current_analyze_plan", getPlanString(info)))
for _, field := range fields {
switch field.Type {
case zapcore.StringType:
fmt.Fprintf(&buf, "%v: %v", field.Key, field.String)
case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type:
fmt.Fprintf(&buf, "%v: %v", field.Key, uint64(field.Integer))
case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type:
fmt.Fprintf(&buf, "%v: %v", field.Key, field.Integer)
case zapcore.BoolType:
fmt.Fprintf(&buf, "%v: %v", field.Key, field.Integer == 1)
}
buf.WriteString("\n")
}
totalCnt--
}
buf.WriteString("\n")
return buf
}
func (record *memoryUsageAlarm) getTop10SqlInfoByMemoryUsage(pinfo []*sessmgr.ProcessInfo) strings.Builder {
return record.getTop10SqlInfo(func(i, j *sessmgr.ProcessInfo) int {
return cmp.Compare(j.MemTracker.MaxConsumed(), i.MemTracker.MaxConsumed())
}, pinfo)
}
func (record *memoryUsageAlarm) getTop10SqlInfoByCostTime(pinfo []*sessmgr.ProcessInfo) strings.Builder {
return record.getTop10SqlInfo(func(i, j *sessmgr.ProcessInfo) int {
return i.Time.Compare(j.Time)
}, pinfo)
}
func (record *memoryUsageAlarm) recordSQL(sm sessmgr.Manager, recordDir string) error {
processInfo := sm.ShowProcessList()
pinfo := make([]*sessmgr.ProcessInfo, 0, len(processInfo))
for _, info := range processInfo {
if len(info.Info) != 0 {
pinfo = append(pinfo, info)
}
}
fileName := filepath.Join(recordDir, "running_sql")
f, err := os.Create(fileName)
if err != nil {
logutil.BgLogger().Error("create oom record file fail", zap.Error(err))
return err
}
defer func() {
err := f.Close()
if err != nil {
logutil.BgLogger().Error("close oom record file fail", zap.Error(err))
}
}()
record.printTop10SqlInfo(pinfo, f)
return nil
}
type item struct {
Name string
Debug int
}
func (*memoryUsageAlarm) recordProfile(recordDir string) error {
items := []item{
{Name: "heap"},
// `goroutine` profile is not recorded, but they'll retry multiple times to extend the profile buffer size,
// which may cause long STW pauses. We'll use `recordGoroutineProfile` instead to allocate a large buffer
// at the beginning.
// {Name: "goroutine", Debug: 2},
}
for _, item := range items {
if err := write(item, recordDir); err != nil {
return err
}
}
return recordGoroutineProfile(recordDir)
}
func recordGoroutineProfile(recordDir string) error {
itemName := "goroutine"
fileName := filepath.Join(recordDir, itemName)
f, err := os.Create(fileName)
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", itemName), zap.Error(err))
return err
}
buf := make([]byte, 1<<26) // 64MB buffer
n := runtime.Stack(buf, true)
if n >= len(buf) {
logutil.BgLogger().Warn("goroutine stack trace is too large, truncating", zap.Int("size", n))
}
_, err = f.Write(buf[:n])
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", itemName), zap.Error(err))
}
return err
}
func write(item item, recordDir string) error {
fileName := filepath.Join(recordDir, item.Name)
f, err := os.Create(fileName)
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.Name), zap.Error(err))
return err
}
p := rpprof.Lookup(item.Name)
err = p.WriteTo(f, item.Debug)
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.Name), zap.Error(err))
return err
}
//nolint: revive
defer func() {
err := f.Close()
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.Name), zap.Error(err))
}
}()
return nil
}