Files
tidb/pkg/util/execdetails/runtime_stats.go

1000 lines
30 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package execdetails
import (
"bytes"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/util"
)
const (
// TpBasicRuntimeStats is the tp for BasicRuntimeStats.
TpBasicRuntimeStats int = iota
// TpRuntimeStatsWithCommit is the tp for RuntimeStatsWithCommit.
TpRuntimeStatsWithCommit
// TpRuntimeStatsWithConcurrencyInfo is the tp for RuntimeStatsWithConcurrencyInfo.
TpRuntimeStatsWithConcurrencyInfo
// TpSnapshotRuntimeStats is the tp for SnapshotRuntimeStats.
TpSnapshotRuntimeStats
// TpHashJoinRuntimeStats is the tp for HashJoinRuntimeStats.
TpHashJoinRuntimeStats
// TpHashJoinRuntimeStatsV2 is the tp for hashJoinRuntimeStatsV2.
TpHashJoinRuntimeStatsV2
// TpIndexLookUpJoinRuntimeStats is the tp for IndexLookUpJoinRuntimeStats.
TpIndexLookUpJoinRuntimeStats
// TpRuntimeStatsWithSnapshot is the tp for RuntimeStatsWithSnapshot.
TpRuntimeStatsWithSnapshot
// TpJoinRuntimeStats is the tp for JoinRuntimeStats.
TpJoinRuntimeStats
// TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats.
TpSelectResultRuntimeStats
// TpInsertRuntimeStat is the tp for InsertRuntimeStat
TpInsertRuntimeStat
// TpIndexLookUpRunTimeStats is the tp for IndexLookUpRunTimeStats
TpIndexLookUpRunTimeStats
// TpSlowQueryRuntimeStat is the tp for SlowQueryRuntimeStat
TpSlowQueryRuntimeStat
// TpHashAggRuntimeStat is the tp for HashAggRuntimeStat
TpHashAggRuntimeStat
// TpIndexMergeRunTimeStats is the tp for IndexMergeRunTimeStats
TpIndexMergeRunTimeStats
// TpBasicCopRunTimeStats is the tp for BasicCopRunTimeStats
TpBasicCopRunTimeStats
// TpUpdateRuntimeStats is the tp for UpdateRuntimeStats
TpUpdateRuntimeStats
// TpFKCheckRuntimeStats is the tp for FKCheckRuntimeStats
TpFKCheckRuntimeStats
// TpFKCascadeRuntimeStats is the tp for FKCascadeRuntimeStats
TpFKCascadeRuntimeStats
// TpRURuntimeStats is the tp for RURuntimeStats
TpRURuntimeStats
)
// RuntimeStats is used to express the executor runtime information.
type RuntimeStats interface {
String() string
Merge(RuntimeStats)
Clone() RuntimeStats
Tp() int
}
type basicCopRuntimeStats struct {
loop int32
rows int64
threads int32
procTimes Percentile[Duration]
// executor extra infos
tiflashStats *TiflashStats
}
// String implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 16))
buf.WriteString("time:")
buf.WriteString(FormatDuration(time.Duration(e.procTimes.sumVal)))
buf.WriteString(", loops:")
buf.WriteString(strconv.Itoa(int(e.loop)))
if e.tiflashStats != nil {
buf.WriteString(", threads:")
buf.WriteString(strconv.Itoa(int(e.threads)))
if !e.tiflashStats.waitSummary.CanBeIgnored() {
buf.WriteString(", ")
buf.WriteString(e.tiflashStats.waitSummary.String())
}
if !e.tiflashStats.networkSummary.Empty() {
buf.WriteString(", ")
buf.WriteString(e.tiflashStats.networkSummary.String())
}
buf.WriteString(", ")
buf.WriteString(e.tiflashStats.scanContext.String())
}
return buf.String()
}
// Clone implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) Clone() RuntimeStats {
stats := &basicCopRuntimeStats{
loop: e.loop,
rows: e.rows,
threads: e.threads,
procTimes: e.procTimes,
}
if e.tiflashStats != nil {
stats.tiflashStats = &TiflashStats{
scanContext: e.tiflashStats.scanContext.Clone(),
waitSummary: e.tiflashStats.waitSummary.Clone(),
networkSummary: e.tiflashStats.networkSummary.Clone(),
}
}
return stats
}
// Merge implements the RuntimeStats interface.
func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) {
tmp, ok := rs.(*basicCopRuntimeStats)
if !ok {
return
}
e.loop += tmp.loop
e.rows += tmp.rows
e.threads += tmp.threads
if tmp.procTimes.Size() > 0 {
e.procTimes.MergePercentile(&tmp.procTimes)
}
if tmp.tiflashStats != nil {
if e.tiflashStats == nil {
e.tiflashStats = &TiflashStats{}
}
e.tiflashStats.scanContext.Merge(tmp.tiflashStats.scanContext)
e.tiflashStats.waitSummary.Merge(tmp.tiflashStats.waitSummary)
e.tiflashStats.networkSummary.Merge(tmp.tiflashStats.networkSummary)
}
}
// mergeExecSummary likes Merge, but it merges ExecutorExecutionSummary directly.
func (e *basicCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) {
e.loop += (int32(*summary.NumIterations))
e.rows += (int64(*summary.NumProducedRows))
e.threads += int32(summary.GetConcurrency())
e.procTimes.Add(Duration(int64(*summary.TimeProcessedNs)))
if tiflashScanContext := summary.GetTiflashScanContext(); tiflashScanContext != nil {
if e.tiflashStats == nil {
e.tiflashStats = &TiflashStats{}
}
e.tiflashStats.scanContext.mergeExecSummary(tiflashScanContext)
}
if tiflashWaitSummary := summary.GetTiflashWaitSummary(); tiflashWaitSummary != nil {
if e.tiflashStats == nil {
e.tiflashStats = &TiflashStats{}
}
e.tiflashStats.waitSummary.mergeExecSummary(tiflashWaitSummary, *summary.TimeProcessedNs)
}
if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil {
if e.tiflashStats == nil {
e.tiflashStats = &TiflashStats{}
}
e.tiflashStats.networkSummary.mergeExecSummary(tiflashNetworkSummary)
}
}
// Tp implements the RuntimeStats interface.
func (*basicCopRuntimeStats) Tp() int {
return TpBasicCopRunTimeStats
}
// StmtCopRuntimeStats stores the cop runtime stats of the total statement
type StmtCopRuntimeStats struct {
// TiflashNetworkStats stats all mpp tasks' network traffic info, nil if no any mpp tasks' network traffic
TiflashNetworkStats *TiFlashNetworkTrafficSummary
}
// mergeExecSummary merges ExecutorExecutionSummary into stmt cop runtime stats directly.
func (e *StmtCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) {
if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil {
if e.TiflashNetworkStats == nil {
e.TiflashNetworkStats = &TiFlashNetworkTrafficSummary{}
}
e.TiflashNetworkStats.mergeExecSummary(tiflashNetworkSummary)
}
}
// CopRuntimeStats collects cop tasks' execution info.
type CopRuntimeStats struct {
// stats stores the runtime statistics of coprocessor tasks.
// The key of the map is the tikv-server address. Because a tikv-server can
// have many region leaders, several coprocessor tasks can be sent to the
// same tikv-server instance. We have to use a list to maintain all tasks
// executed on each instance.
stats basicCopRuntimeStats
scanDetail util.ScanDetail
timeDetail util.TimeDetail
storeType kv.StoreType
}
// GetActRows return total rows of CopRuntimeStats.
func (crs *CopRuntimeStats) GetActRows() int64 {
return crs.stats.rows
}
// GetTasks return total tasks of CopRuntimeStats
func (crs *CopRuntimeStats) GetTasks() int32 {
return int32(crs.stats.procTimes.size)
}
var zeroTimeDetail = util.TimeDetail{}
func (crs *CopRuntimeStats) String() string {
procTimes := crs.stats.procTimes
totalTasks := procTimes.size
isTiFlashCop := crs.storeType == kv.TiFlash
buf := bytes.NewBuffer(make([]byte, 0, 16))
{
printTiFlashSpecificInfo := func() {
if isTiFlashCop {
buf.WriteString(", ")
buf.WriteString("threads:")
buf.WriteString(strconv.Itoa(int(crs.stats.threads)))
buf.WriteString("}")
if crs.stats.tiflashStats != nil {
if !crs.stats.tiflashStats.waitSummary.CanBeIgnored() {
buf.WriteString(", ")
buf.WriteString(crs.stats.tiflashStats.waitSummary.String())
}
if !crs.stats.tiflashStats.networkSummary.Empty() {
buf.WriteString(", ")
buf.WriteString(crs.stats.tiflashStats.networkSummary.String())
}
if !crs.stats.tiflashStats.scanContext.Empty() {
buf.WriteString(", ")
buf.WriteString(crs.stats.tiflashStats.scanContext.String())
}
}
} else {
buf.WriteString("}")
}
}
if totalTasks == 1 {
buf.WriteString(crs.storeType.Name())
buf.WriteString("_task:{time:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0))))
buf.WriteString(", loops:")
buf.WriteString(strconv.Itoa(int(crs.stats.loop)))
printTiFlashSpecificInfo()
} else if totalTasks > 0 {
buf.WriteString(crs.storeType.Name())
buf.WriteString("_task:{proc max:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64())))
buf.WriteString(", min:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64())))
buf.WriteString(", avg: ")
buf.WriteString(FormatDuration(time.Duration(int64(procTimes.Sum()) / int64(totalTasks))))
buf.WriteString(", p80:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8))))
buf.WriteString(", p95:")
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95))))
buf.WriteString(", iters:")
buf.WriteString(strconv.Itoa(int(crs.stats.loop)))
buf.WriteString(", tasks:")
buf.WriteString(strconv.Itoa(totalTasks))
printTiFlashSpecificInfo()
}
}
if !isTiFlashCop {
detail := crs.scanDetail.String()
if detail != "" {
buf.WriteString(", ")
buf.WriteString(detail)
}
if crs.timeDetail != zeroTimeDetail {
timeDetailStr := crs.timeDetail.String()
if timeDetailStr != "" {
buf.WriteString(", ")
buf.WriteString(timeDetailStr)
}
}
}
return buf.String()
}
// BasicRuntimeStats is the basic runtime stats.
type BasicRuntimeStats struct {
// the count of executors with the same id
executorCount atomic.Int32
// executor's Next() called times.
loop atomic.Int32
// executor consume time, including open, next, and close time.
consume atomic.Int64
// executor open time.
open atomic.Int64
// executor close time.
close atomic.Int64
// executor return row count.
rows atomic.Int64
}
// GetActRows return total rows of BasicRuntimeStats.
func (e *BasicRuntimeStats) GetActRows() int64 {
return e.rows.Load()
}
// Clone implements the RuntimeStats interface.
// BasicRuntimeStats shouldn't implement Clone interface because all executors with the same executor_id
// should share the same BasicRuntimeStats, duplicated BasicRuntimeStats are easy to cause mistakes.
func (*BasicRuntimeStats) Clone() RuntimeStats {
panic("BasicRuntimeStats should not implement Clone function")
}
// Merge implements the RuntimeStats interface.
func (e *BasicRuntimeStats) Merge(rs RuntimeStats) {
tmp, ok := rs.(*BasicRuntimeStats)
if !ok {
return
}
e.loop.Add(tmp.loop.Load())
e.consume.Add(tmp.consume.Load())
e.open.Add(tmp.open.Load())
e.close.Add(tmp.close.Load())
e.rows.Add(tmp.rows.Load())
}
// Tp implements the RuntimeStats interface.
func (*BasicRuntimeStats) Tp() int {
return TpBasicRuntimeStats
}
// RootRuntimeStats is the executor runtime stats that combine with multiple runtime stats.
type RootRuntimeStats struct {
basic *BasicRuntimeStats
groupRss []RuntimeStats
}
// NewRootRuntimeStats returns a new RootRuntimeStats
func NewRootRuntimeStats() *RootRuntimeStats {
return &RootRuntimeStats{}
}
// GetActRows return total rows of RootRuntimeStats.
func (e *RootRuntimeStats) GetActRows() int64 {
if e.basic == nil {
return 0
}
return e.basic.rows.Load()
}
// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) {
return e.basic, e.groupRss
}
// String implements the RuntimeStats interface.
func (e *RootRuntimeStats) String() string {
basic, groups := e.MergeStats()
strs := make([]string, 0, len(groups)+1)
if basic != nil {
strs = append(strs, basic.String())
}
for _, group := range groups {
str := group.String()
if len(str) > 0 {
strs = append(strs, str)
}
}
return strings.Join(strs, ", ")
}
// Record records executor's execution.
func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) {
e.loop.Add(1)
e.consume.Add(int64(d))
e.rows.Add(int64(rowNum))
}
// RecordOpen records executor's open time.
func (e *BasicRuntimeStats) RecordOpen(d time.Duration) {
e.consume.Add(int64(d))
e.open.Add(int64(d))
}
// RecordClose records executor's close time.
func (e *BasicRuntimeStats) RecordClose(d time.Duration) {
e.consume.Add(int64(d))
e.close.Add(int64(d))
}
// SetRowNum sets the row num.
func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {
e.rows.Store(rowNum)
}
// String implements the RuntimeStats interface.
func (e *BasicRuntimeStats) String() string {
if e == nil {
return ""
}
var str strings.Builder
timePrefix := ""
if e.executorCount.Load() > 1 {
timePrefix = "total_"
}
totalTime := e.consume.Load()
openTime := e.open.Load()
closeTime := e.close.Load()
str.WriteString(timePrefix)
str.WriteString("time:")
str.WriteString(FormatDuration(time.Duration(totalTime)))
str.WriteString(", ")
str.WriteString(timePrefix)
str.WriteString("open:")
str.WriteString(FormatDuration(time.Duration(openTime)))
str.WriteString(", ")
str.WriteString(timePrefix)
str.WriteString("close:")
str.WriteString(FormatDuration(time.Duration(closeTime)))
str.WriteString(", loops:")
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
return str.String()
}
// GetTime get the int64 total time
func (e *BasicRuntimeStats) GetTime() int64 {
return e.consume.Load()
}
// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
rootStats map[int]*RootRuntimeStats
copStats map[int]*CopRuntimeStats
stmtCopStats StmtCopRuntimeStats
mu sync.Mutex
}
// NewRuntimeStatsColl creates new executor collector.
// Reuse the object to reduce allocation when *RuntimeStatsColl is not nil.
func NewRuntimeStatsColl(reuse *RuntimeStatsColl) *RuntimeStatsColl {
if reuse != nil {
// Reuse map is cheaper than create a new map object.
// Go compiler optimize this cleanup code pattern to a clearmap() function.
reuse.mu.Lock()
defer reuse.mu.Unlock()
for k := range reuse.rootStats {
delete(reuse.rootStats, k)
}
for k := range reuse.copStats {
delete(reuse.copStats, k)
}
return reuse
}
return &RuntimeStatsColl{
rootStats: make(map[int]*RootRuntimeStats),
copStats: make(map[int]*CopRuntimeStats),
}
}
// RegisterStats register execStat for a executor.
func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) {
e.mu.Lock()
defer e.mu.Unlock()
stats, ok := e.rootStats[planID]
if !ok {
stats = NewRootRuntimeStats()
e.rootStats[planID] = stats
}
tp := info.Tp()
found := false
for _, rss := range stats.groupRss {
if rss.Tp() == tp {
rss.Merge(info)
found = true
break
}
}
if !found {
stats.groupRss = append(stats.groupRss, info)
}
}
// GetBasicRuntimeStats gets basicRuntimeStats for a executor
// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument:
// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount
// 2. Else, it returns nil
func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
stats, ok := e.rootStats[planID]
if !ok && initNewExecutorStats {
stats = NewRootRuntimeStats()
e.rootStats[planID] = stats
}
if stats == nil {
return nil
}
if stats.basic == nil && initNewExecutorStats {
stats.basic = &BasicRuntimeStats{}
stats.basic.executorCount.Add(1)
} else if stats.basic != nil && initNewExecutorStats {
stats.basic.executorCount.Add(1)
}
return stats.basic
}
// GetStmtCopRuntimeStats gets execStat for a executor.
func (e *RuntimeStatsColl) GetStmtCopRuntimeStats() StmtCopRuntimeStats {
return e.stmtCopStats
}
// GetRootStats gets execStat for a executor.
func (e *RuntimeStatsColl) GetRootStats(planID int) *RootRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
runtimeStats, exists := e.rootStats[planID]
if !exists {
runtimeStats = NewRootRuntimeStats()
e.rootStats[planID] = runtimeStats
}
return runtimeStats
}
// GetPlanActRows returns the actual rows of the plan.
func (e *RuntimeStatsColl) GetPlanActRows(planID int) int64 {
e.mu.Lock()
defer e.mu.Unlock()
runtimeStats, exists := e.rootStats[planID]
if !exists {
return 0
}
return runtimeStats.GetActRows()
}
// GetCopStats gets the CopRuntimeStats specified by planID.
func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
copStats, ok := e.copStats[planID]
if !ok {
return nil
}
return copStats
}
// GetCopCountAndRows returns the total cop-tasks count and total rows of all cop-tasks.
func (e *RuntimeStatsColl) GetCopCountAndRows(planID int) (int32, int64) {
e.mu.Lock()
defer e.mu.Unlock()
copStats, ok := e.copStats[planID]
if !ok {
return 0, 0
}
return copStats.GetTasks(), copStats.GetActRows()
}
func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, bool) {
if summary.GetExecutorId() != "" {
strs := strings.Split(summary.GetExecutorId(), "_")
if id, err := strconv.Atoi(strs[len(strs)-1]); err == nil {
return id, true
}
}
return 0, false
}
// RecordCopStats records a specific cop tasks's execution detail.
func (e *RuntimeStatsColl) RecordCopStats(planID int, storeType kv.StoreType, scan *util.ScanDetail, time util.TimeDetail, summary *tipb.ExecutorExecutionSummary) int {
e.mu.Lock()
defer e.mu.Unlock()
copStats, ok := e.copStats[planID]
if !ok {
copStats = &CopRuntimeStats{
timeDetail: time,
storeType: storeType,
}
if scan != nil {
copStats.scanDetail = *scan
}
e.copStats[planID] = copStats
} else {
if scan != nil {
copStats.scanDetail.Merge(scan)
}
copStats.timeDetail.Merge(&time)
}
if summary != nil {
// for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in
// summary, use it overwrite the planID
id, valid := getPlanIDFromExecutionSummary(summary)
if valid && id != planID {
planID = id
copStats, ok = e.copStats[planID]
if !ok {
copStats = &CopRuntimeStats{
storeType: storeType,
}
e.copStats[planID] = copStats
}
}
copStats.stats.mergeExecSummary(summary)
e.stmtCopStats.mergeExecSummary(summary)
}
return planID
}
// RecordOneCopTask records a specific cop tasks's execution summary.
func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, summary *tipb.ExecutorExecutionSummary) int {
// for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in
// summary, use it overwrite the planID
if id, valid := getPlanIDFromExecutionSummary(summary); valid {
planID = id
}
e.mu.Lock()
defer e.mu.Unlock()
copStats, ok := e.copStats[planID]
if !ok {
copStats = &CopRuntimeStats{
storeType: storeType,
}
e.copStats[planID] = copStats
}
copStats.stats.mergeExecSummary(summary)
e.stmtCopStats.mergeExecSummary(summary)
return planID
}
// ExistsRootStats checks if the planID exists in the rootStats collection.
func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool {
e.mu.Lock()
defer e.mu.Unlock()
_, exists := e.rootStats[planID]
return exists
}
// ExistsCopStats checks if the planID exists in the copStats collection.
func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool {
e.mu.Lock()
defer e.mu.Unlock()
_, exists := e.copStats[planID]
return exists
}
// ConcurrencyInfo is used to save the concurrency information of the executor operator
type ConcurrencyInfo struct {
concurrencyName string
concurrencyNum int
}
// NewConcurrencyInfo creates new executor's concurrencyInfo.
func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {
return &ConcurrencyInfo{name, num}
}
// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
type RuntimeStatsWithConcurrencyInfo struct {
// executor concurrency information
concurrency []*ConcurrencyInfo
// protect concurrency
sync.Mutex
}
// Tp implements the RuntimeStats interface.
func (*RuntimeStatsWithConcurrencyInfo) Tp() int {
return TpRuntimeStatsWithConcurrencyInfo
}
// SetConcurrencyInfo sets the concurrency informations.
// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo.
// When the num <= 0, it means the exector operator is not executed parallel.
func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*ConcurrencyInfo) {
e.Lock()
defer e.Unlock()
e.concurrency = e.concurrency[:0]
e.concurrency = append(e.concurrency, infos...)
}
// Clone implements the RuntimeStats interface.
func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats {
newRs := &RuntimeStatsWithConcurrencyInfo{
concurrency: make([]*ConcurrencyInfo, 0, len(e.concurrency)),
}
newRs.concurrency = append(newRs.concurrency, e.concurrency...)
return newRs
}
// String implements the RuntimeStats interface.
func (e *RuntimeStatsWithConcurrencyInfo) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 8))
if len(e.concurrency) > 0 {
for i, concurrency := range e.concurrency {
if i > 0 {
buf.WriteString(", ")
}
if concurrency.concurrencyNum > 0 {
buf.WriteString(concurrency.concurrencyName)
buf.WriteByte(':')
buf.WriteString(strconv.Itoa(concurrency.concurrencyNum))
} else {
buf.WriteString(concurrency.concurrencyName)
buf.WriteString(":OFF")
}
}
}
return buf.String()
}
// Merge implements the RuntimeStats interface.
func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {}
// RuntimeStatsWithCommit is the RuntimeStats with commit detail.
type RuntimeStatsWithCommit struct {
Commit *util.CommitDetails
LockKeys *util.LockKeysDetails
TxnCnt int
}
// Tp implements the RuntimeStats interface.
func (*RuntimeStatsWithCommit) Tp() int {
return TpRuntimeStatsWithCommit
}
// MergeCommitDetails merges the commit details.
func (e *RuntimeStatsWithCommit) MergeCommitDetails(detail *util.CommitDetails) {
if detail == nil {
return
}
if e.Commit == nil {
e.Commit = detail
e.TxnCnt = 1
return
}
e.Commit.Merge(detail)
e.TxnCnt++
}
// Merge implements the RuntimeStats interface.
func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) {
tmp, ok := rs.(*RuntimeStatsWithCommit)
if !ok {
return
}
e.TxnCnt += tmp.TxnCnt
if tmp.Commit != nil {
if e.Commit == nil {
e.Commit = &util.CommitDetails{}
}
e.Commit.Merge(tmp.Commit)
}
if tmp.LockKeys != nil {
if e.LockKeys == nil {
e.LockKeys = &util.LockKeysDetails{}
}
e.LockKeys.Merge(tmp.LockKeys)
}
}
// Clone implements the RuntimeStats interface.
func (e *RuntimeStatsWithCommit) Clone() RuntimeStats {
newRs := RuntimeStatsWithCommit{
TxnCnt: e.TxnCnt,
}
if e.Commit != nil {
newRs.Commit = e.Commit.Clone()
}
if e.LockKeys != nil {
newRs.LockKeys = e.LockKeys.Clone()
}
return &newRs
}
// String implements the RuntimeStats interface.
func (e *RuntimeStatsWithCommit) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 32))
if e.Commit != nil {
buf.WriteString("commit_txn: {")
// Only print out when there are more than 1 transaction.
if e.TxnCnt > 1 {
buf.WriteString("count: ")
buf.WriteString(strconv.Itoa(e.TxnCnt))
buf.WriteString(", ")
}
if e.Commit.PrewriteTime > 0 {
buf.WriteString("prewrite:")
buf.WriteString(FormatDuration(e.Commit.PrewriteTime))
}
if e.Commit.WaitPrewriteBinlogTime > 0 {
buf.WriteString(", wait_prewrite_binlog:")
buf.WriteString(FormatDuration(e.Commit.WaitPrewriteBinlogTime))
}
if e.Commit.GetCommitTsTime > 0 {
buf.WriteString(", get_commit_ts:")
buf.WriteString(FormatDuration(e.Commit.GetCommitTsTime))
}
if e.Commit.CommitTime > 0 {
buf.WriteString(", commit:")
buf.WriteString(FormatDuration(e.Commit.CommitTime))
}
e.Commit.Mu.Lock()
commitBackoffTime := e.Commit.Mu.CommitBackoffTime
if commitBackoffTime > 0 {
buf.WriteString(", backoff: {time: ")
buf.WriteString(FormatDuration(time.Duration(commitBackoffTime)))
if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 {
buf.WriteString(", prewrite type: ")
e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes)
}
if len(e.Commit.Mu.CommitBackoffTypes) > 0 {
buf.WriteString(", commit type: ")
e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes)
}
buf.WriteString("}")
}
if e.Commit.Mu.SlowestPrewrite.ReqTotalTime > 0 {
buf.WriteString(", slowest_prewrite_rpc: {total: ")
buf.WriteString(strconv.FormatFloat(e.Commit.Mu.SlowestPrewrite.ReqTotalTime.Seconds(), 'f', 3, 64))
buf.WriteString("s, region_id: ")
buf.WriteString(strconv.FormatUint(e.Commit.Mu.SlowestPrewrite.Region, 10))
buf.WriteString(", store: ")
buf.WriteString(e.Commit.Mu.SlowestPrewrite.StoreAddr)
buf.WriteString(", ")
buf.WriteString(e.Commit.Mu.SlowestPrewrite.ExecDetails.String())
buf.WriteString("}")
}
if e.Commit.Mu.CommitPrimary.ReqTotalTime > 0 {
buf.WriteString(", commit_primary_rpc: {total: ")
buf.WriteString(strconv.FormatFloat(e.Commit.Mu.CommitPrimary.ReqTotalTime.Seconds(), 'f', 3, 64))
buf.WriteString("s, region_id: ")
buf.WriteString(strconv.FormatUint(e.Commit.Mu.CommitPrimary.Region, 10))
buf.WriteString(", store: ")
buf.WriteString(e.Commit.Mu.CommitPrimary.StoreAddr)
buf.WriteString(", ")
buf.WriteString(e.Commit.Mu.CommitPrimary.ExecDetails.String())
buf.WriteString("}")
}
e.Commit.Mu.Unlock()
if e.Commit.ResolveLock.ResolveLockTime > 0 {
buf.WriteString(", resolve_lock: ")
buf.WriteString(FormatDuration(time.Duration(e.Commit.ResolveLock.ResolveLockTime)))
}
prewriteRegionNum := atomic.LoadInt32(&e.Commit.PrewriteRegionNum)
if prewriteRegionNum > 0 {
buf.WriteString(", region_num:")
buf.WriteString(strconv.FormatInt(int64(prewriteRegionNum), 10))
}
if e.Commit.WriteKeys > 0 {
buf.WriteString(", write_keys:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteKeys), 10))
}
if e.Commit.WriteSize > 0 {
buf.WriteString(", write_byte:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteSize), 10))
}
if e.Commit.TxnRetry > 0 {
buf.WriteString(", txn_retry:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.TxnRetry), 10))
}
buf.WriteString("}")
}
if e.LockKeys != nil {
if buf.Len() > 0 {
buf.WriteString(", ")
}
buf.WriteString("lock_keys: {")
if e.LockKeys.TotalTime > 0 {
buf.WriteString("time:")
buf.WriteString(FormatDuration(e.LockKeys.TotalTime))
}
if e.LockKeys.RegionNum > 0 {
buf.WriteString(", region:")
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RegionNum), 10))
}
if e.LockKeys.LockKeys > 0 {
buf.WriteString(", keys:")
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.LockKeys), 10))
}
if e.LockKeys.ResolveLock.ResolveLockTime > 0 {
buf.WriteString(", resolve_lock:")
buf.WriteString(FormatDuration(time.Duration(e.LockKeys.ResolveLock.ResolveLockTime)))
}
e.LockKeys.Mu.Lock()
if e.LockKeys.BackoffTime > 0 {
buf.WriteString(", backoff: {time: ")
buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime)))
if len(e.LockKeys.Mu.BackoffTypes) > 0 {
buf.WriteString(", type: ")
e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes)
}
buf.WriteString("}")
}
if e.LockKeys.Mu.SlowestReqTotalTime > 0 {
buf.WriteString(", slowest_rpc: {total: ")
buf.WriteString(strconv.FormatFloat(e.LockKeys.Mu.SlowestReqTotalTime.Seconds(), 'f', 3, 64))
buf.WriteString("s, region_id: ")
buf.WriteString(strconv.FormatUint(e.LockKeys.Mu.SlowestRegion, 10))
buf.WriteString(", store: ")
buf.WriteString(e.LockKeys.Mu.SlowestStoreAddr)
buf.WriteString(", ")
buf.WriteString(e.LockKeys.Mu.SlowestExecDetails.String())
buf.WriteString("}")
}
e.LockKeys.Mu.Unlock()
if e.LockKeys.LockRPCTime > 0 {
buf.WriteString(", lock_rpc:")
buf.WriteString(time.Duration(e.LockKeys.LockRPCTime).String())
}
if e.LockKeys.LockRPCCount > 0 {
buf.WriteString(", rpc_count:")
buf.WriteString(strconv.FormatInt(e.LockKeys.LockRPCCount, 10))
}
if e.LockKeys.RetryCount > 0 {
buf.WriteString(", retry_count:")
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RetryCount), 10))
}
buf.WriteString("}")
}
return buf.String()
}
func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) {
if len(backoffTypes) == 0 {
return
}
tpMap := make(map[string]struct{})
tpArray := []string{}
for _, tpStr := range backoffTypes {
_, ok := tpMap[tpStr]
if ok {
continue
}
tpMap[tpStr] = struct{}{}
tpArray = append(tpArray, tpStr)
}
slices.Sort(tpArray)
buf.WriteByte('[')
for i, tp := range tpArray {
if i > 0 {
buf.WriteString(" ")
}
buf.WriteString(tp)
}
buf.WriteByte(']')
}
// RURuntimeStats is a wrapper of util.RUDetails,
// which implements the RuntimeStats interface.
type RURuntimeStats struct {
*util.RUDetails
}
// String implements the RuntimeStats interface.
func (e *RURuntimeStats) String() string {
if e.RUDetails != nil {
buf := bytes.NewBuffer(make([]byte, 0, 8))
buf.WriteString("RU:")
buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64))
return buf.String()
}
return ""
}
// Clone implements the RuntimeStats interface.
func (e *RURuntimeStats) Clone() RuntimeStats {
return &RURuntimeStats{RUDetails: e.RUDetails.Clone()}
}
// Merge implements the RuntimeStats interface.
func (e *RURuntimeStats) Merge(other RuntimeStats) {
if tmp, ok := other.(*RURuntimeStats); ok {
if e.RUDetails != nil {
e.RUDetails.Merge(tmp.RUDetails)
} else {
e.RUDetails = tmp.RUDetails.Clone()
}
}
}
// Tp implements the RuntimeStats interface.
func (*RURuntimeStats) Tp() int {
return TpRURuntimeStats
}