1000 lines
30 KiB
Go
1000 lines
30 KiB
Go
// Copyright 2025 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package execdetails
|
|
|
|
import (
|
|
"bytes"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
"github.com/tikv/client-go/v2/util"
|
|
)
|
|
|
|
const (
|
|
// TpBasicRuntimeStats is the tp for BasicRuntimeStats.
|
|
TpBasicRuntimeStats int = iota
|
|
// TpRuntimeStatsWithCommit is the tp for RuntimeStatsWithCommit.
|
|
TpRuntimeStatsWithCommit
|
|
// TpRuntimeStatsWithConcurrencyInfo is the tp for RuntimeStatsWithConcurrencyInfo.
|
|
TpRuntimeStatsWithConcurrencyInfo
|
|
// TpSnapshotRuntimeStats is the tp for SnapshotRuntimeStats.
|
|
TpSnapshotRuntimeStats
|
|
// TpHashJoinRuntimeStats is the tp for HashJoinRuntimeStats.
|
|
TpHashJoinRuntimeStats
|
|
// TpHashJoinRuntimeStatsV2 is the tp for hashJoinRuntimeStatsV2.
|
|
TpHashJoinRuntimeStatsV2
|
|
// TpIndexLookUpJoinRuntimeStats is the tp for IndexLookUpJoinRuntimeStats.
|
|
TpIndexLookUpJoinRuntimeStats
|
|
// TpRuntimeStatsWithSnapshot is the tp for RuntimeStatsWithSnapshot.
|
|
TpRuntimeStatsWithSnapshot
|
|
// TpJoinRuntimeStats is the tp for JoinRuntimeStats.
|
|
TpJoinRuntimeStats
|
|
// TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats.
|
|
TpSelectResultRuntimeStats
|
|
// TpInsertRuntimeStat is the tp for InsertRuntimeStat
|
|
TpInsertRuntimeStat
|
|
// TpIndexLookUpRunTimeStats is the tp for IndexLookUpRunTimeStats
|
|
TpIndexLookUpRunTimeStats
|
|
// TpSlowQueryRuntimeStat is the tp for SlowQueryRuntimeStat
|
|
TpSlowQueryRuntimeStat
|
|
// TpHashAggRuntimeStat is the tp for HashAggRuntimeStat
|
|
TpHashAggRuntimeStat
|
|
// TpIndexMergeRunTimeStats is the tp for IndexMergeRunTimeStats
|
|
TpIndexMergeRunTimeStats
|
|
// TpBasicCopRunTimeStats is the tp for BasicCopRunTimeStats
|
|
TpBasicCopRunTimeStats
|
|
// TpUpdateRuntimeStats is the tp for UpdateRuntimeStats
|
|
TpUpdateRuntimeStats
|
|
// TpFKCheckRuntimeStats is the tp for FKCheckRuntimeStats
|
|
TpFKCheckRuntimeStats
|
|
// TpFKCascadeRuntimeStats is the tp for FKCascadeRuntimeStats
|
|
TpFKCascadeRuntimeStats
|
|
// TpRURuntimeStats is the tp for RURuntimeStats
|
|
TpRURuntimeStats
|
|
)
|
|
|
|
// RuntimeStats is used to express the executor runtime information.
|
|
type RuntimeStats interface {
|
|
String() string
|
|
Merge(RuntimeStats)
|
|
Clone() RuntimeStats
|
|
Tp() int
|
|
}
|
|
|
|
type basicCopRuntimeStats struct {
|
|
loop int32
|
|
rows int64
|
|
threads int32
|
|
procTimes Percentile[Duration]
|
|
// executor extra infos
|
|
tiflashStats *TiflashStats
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *basicCopRuntimeStats) String() string {
|
|
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
|
buf.WriteString("time:")
|
|
buf.WriteString(FormatDuration(time.Duration(e.procTimes.sumVal)))
|
|
buf.WriteString(", loops:")
|
|
buf.WriteString(strconv.Itoa(int(e.loop)))
|
|
if e.tiflashStats != nil {
|
|
buf.WriteString(", threads:")
|
|
buf.WriteString(strconv.Itoa(int(e.threads)))
|
|
if !e.tiflashStats.waitSummary.CanBeIgnored() {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.tiflashStats.waitSummary.String())
|
|
}
|
|
if !e.tiflashStats.networkSummary.Empty() {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.tiflashStats.networkSummary.String())
|
|
}
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.tiflashStats.scanContext.String())
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// Clone implements the RuntimeStats interface.
|
|
func (e *basicCopRuntimeStats) Clone() RuntimeStats {
|
|
stats := &basicCopRuntimeStats{
|
|
loop: e.loop,
|
|
rows: e.rows,
|
|
threads: e.threads,
|
|
procTimes: e.procTimes,
|
|
}
|
|
if e.tiflashStats != nil {
|
|
stats.tiflashStats = &TiflashStats{
|
|
scanContext: e.tiflashStats.scanContext.Clone(),
|
|
waitSummary: e.tiflashStats.waitSummary.Clone(),
|
|
networkSummary: e.tiflashStats.networkSummary.Clone(),
|
|
}
|
|
}
|
|
return stats
|
|
}
|
|
|
|
// Merge implements the RuntimeStats interface.
|
|
func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) {
|
|
tmp, ok := rs.(*basicCopRuntimeStats)
|
|
if !ok {
|
|
return
|
|
}
|
|
e.loop += tmp.loop
|
|
e.rows += tmp.rows
|
|
e.threads += tmp.threads
|
|
if tmp.procTimes.Size() > 0 {
|
|
e.procTimes.MergePercentile(&tmp.procTimes)
|
|
}
|
|
if tmp.tiflashStats != nil {
|
|
if e.tiflashStats == nil {
|
|
e.tiflashStats = &TiflashStats{}
|
|
}
|
|
e.tiflashStats.scanContext.Merge(tmp.tiflashStats.scanContext)
|
|
e.tiflashStats.waitSummary.Merge(tmp.tiflashStats.waitSummary)
|
|
e.tiflashStats.networkSummary.Merge(tmp.tiflashStats.networkSummary)
|
|
}
|
|
}
|
|
|
|
// mergeExecSummary likes Merge, but it merges ExecutorExecutionSummary directly.
|
|
func (e *basicCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) {
|
|
e.loop += (int32(*summary.NumIterations))
|
|
e.rows += (int64(*summary.NumProducedRows))
|
|
e.threads += int32(summary.GetConcurrency())
|
|
e.procTimes.Add(Duration(int64(*summary.TimeProcessedNs)))
|
|
if tiflashScanContext := summary.GetTiflashScanContext(); tiflashScanContext != nil {
|
|
if e.tiflashStats == nil {
|
|
e.tiflashStats = &TiflashStats{}
|
|
}
|
|
e.tiflashStats.scanContext.mergeExecSummary(tiflashScanContext)
|
|
}
|
|
if tiflashWaitSummary := summary.GetTiflashWaitSummary(); tiflashWaitSummary != nil {
|
|
if e.tiflashStats == nil {
|
|
e.tiflashStats = &TiflashStats{}
|
|
}
|
|
e.tiflashStats.waitSummary.mergeExecSummary(tiflashWaitSummary, *summary.TimeProcessedNs)
|
|
}
|
|
if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil {
|
|
if e.tiflashStats == nil {
|
|
e.tiflashStats = &TiflashStats{}
|
|
}
|
|
e.tiflashStats.networkSummary.mergeExecSummary(tiflashNetworkSummary)
|
|
}
|
|
}
|
|
|
|
// Tp implements the RuntimeStats interface.
|
|
func (*basicCopRuntimeStats) Tp() int {
|
|
return TpBasicCopRunTimeStats
|
|
}
|
|
|
|
// StmtCopRuntimeStats stores the cop runtime stats of the total statement
|
|
type StmtCopRuntimeStats struct {
|
|
// TiflashNetworkStats stats all mpp tasks' network traffic info, nil if no any mpp tasks' network traffic
|
|
TiflashNetworkStats *TiFlashNetworkTrafficSummary
|
|
}
|
|
|
|
// mergeExecSummary merges ExecutorExecutionSummary into stmt cop runtime stats directly.
|
|
func (e *StmtCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) {
|
|
if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil {
|
|
if e.TiflashNetworkStats == nil {
|
|
e.TiflashNetworkStats = &TiFlashNetworkTrafficSummary{}
|
|
}
|
|
e.TiflashNetworkStats.mergeExecSummary(tiflashNetworkSummary)
|
|
}
|
|
}
|
|
|
|
// CopRuntimeStats collects cop tasks' execution info.
|
|
type CopRuntimeStats struct {
|
|
// stats stores the runtime statistics of coprocessor tasks.
|
|
// The key of the map is the tikv-server address. Because a tikv-server can
|
|
// have many region leaders, several coprocessor tasks can be sent to the
|
|
// same tikv-server instance. We have to use a list to maintain all tasks
|
|
// executed on each instance.
|
|
stats basicCopRuntimeStats
|
|
scanDetail util.ScanDetail
|
|
timeDetail util.TimeDetail
|
|
storeType kv.StoreType
|
|
}
|
|
|
|
// GetActRows return total rows of CopRuntimeStats.
|
|
func (crs *CopRuntimeStats) GetActRows() int64 {
|
|
return crs.stats.rows
|
|
}
|
|
|
|
// GetTasks return total tasks of CopRuntimeStats
|
|
func (crs *CopRuntimeStats) GetTasks() int32 {
|
|
return int32(crs.stats.procTimes.size)
|
|
}
|
|
|
|
var zeroTimeDetail = util.TimeDetail{}
|
|
|
|
func (crs *CopRuntimeStats) String() string {
|
|
procTimes := crs.stats.procTimes
|
|
totalTasks := procTimes.size
|
|
isTiFlashCop := crs.storeType == kv.TiFlash
|
|
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
|
{
|
|
printTiFlashSpecificInfo := func() {
|
|
if isTiFlashCop {
|
|
buf.WriteString(", ")
|
|
buf.WriteString("threads:")
|
|
buf.WriteString(strconv.Itoa(int(crs.stats.threads)))
|
|
buf.WriteString("}")
|
|
if crs.stats.tiflashStats != nil {
|
|
if !crs.stats.tiflashStats.waitSummary.CanBeIgnored() {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(crs.stats.tiflashStats.waitSummary.String())
|
|
}
|
|
if !crs.stats.tiflashStats.networkSummary.Empty() {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(crs.stats.tiflashStats.networkSummary.String())
|
|
}
|
|
if !crs.stats.tiflashStats.scanContext.Empty() {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(crs.stats.tiflashStats.scanContext.String())
|
|
}
|
|
}
|
|
} else {
|
|
buf.WriteString("}")
|
|
}
|
|
}
|
|
if totalTasks == 1 {
|
|
buf.WriteString(crs.storeType.Name())
|
|
buf.WriteString("_task:{time:")
|
|
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0))))
|
|
buf.WriteString(", loops:")
|
|
buf.WriteString(strconv.Itoa(int(crs.stats.loop)))
|
|
printTiFlashSpecificInfo()
|
|
} else if totalTasks > 0 {
|
|
buf.WriteString(crs.storeType.Name())
|
|
buf.WriteString("_task:{proc max:")
|
|
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64())))
|
|
buf.WriteString(", min:")
|
|
buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64())))
|
|
buf.WriteString(", avg: ")
|
|
buf.WriteString(FormatDuration(time.Duration(int64(procTimes.Sum()) / int64(totalTasks))))
|
|
buf.WriteString(", p80:")
|
|
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8))))
|
|
buf.WriteString(", p95:")
|
|
buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95))))
|
|
buf.WriteString(", iters:")
|
|
buf.WriteString(strconv.Itoa(int(crs.stats.loop)))
|
|
buf.WriteString(", tasks:")
|
|
buf.WriteString(strconv.Itoa(totalTasks))
|
|
printTiFlashSpecificInfo()
|
|
}
|
|
}
|
|
if !isTiFlashCop {
|
|
detail := crs.scanDetail.String()
|
|
if detail != "" {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(detail)
|
|
}
|
|
if crs.timeDetail != zeroTimeDetail {
|
|
timeDetailStr := crs.timeDetail.String()
|
|
if timeDetailStr != "" {
|
|
buf.WriteString(", ")
|
|
buf.WriteString(timeDetailStr)
|
|
}
|
|
}
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// BasicRuntimeStats is the basic runtime stats.
|
|
type BasicRuntimeStats struct {
|
|
// the count of executors with the same id
|
|
executorCount atomic.Int32
|
|
// executor's Next() called times.
|
|
loop atomic.Int32
|
|
// executor consume time, including open, next, and close time.
|
|
consume atomic.Int64
|
|
// executor open time.
|
|
open atomic.Int64
|
|
// executor close time.
|
|
close atomic.Int64
|
|
// executor return row count.
|
|
rows atomic.Int64
|
|
}
|
|
|
|
// GetActRows return total rows of BasicRuntimeStats.
|
|
func (e *BasicRuntimeStats) GetActRows() int64 {
|
|
return e.rows.Load()
|
|
}
|
|
|
|
// Clone implements the RuntimeStats interface.
|
|
// BasicRuntimeStats shouldn't implement Clone interface because all executors with the same executor_id
|
|
// should share the same BasicRuntimeStats, duplicated BasicRuntimeStats are easy to cause mistakes.
|
|
func (*BasicRuntimeStats) Clone() RuntimeStats {
|
|
panic("BasicRuntimeStats should not implement Clone function")
|
|
}
|
|
|
|
// Merge implements the RuntimeStats interface.
|
|
func (e *BasicRuntimeStats) Merge(rs RuntimeStats) {
|
|
tmp, ok := rs.(*BasicRuntimeStats)
|
|
if !ok {
|
|
return
|
|
}
|
|
e.loop.Add(tmp.loop.Load())
|
|
e.consume.Add(tmp.consume.Load())
|
|
e.open.Add(tmp.open.Load())
|
|
e.close.Add(tmp.close.Load())
|
|
e.rows.Add(tmp.rows.Load())
|
|
}
|
|
|
|
// Tp implements the RuntimeStats interface.
|
|
func (*BasicRuntimeStats) Tp() int {
|
|
return TpBasicRuntimeStats
|
|
}
|
|
|
|
// RootRuntimeStats is the executor runtime stats that combine with multiple runtime stats.
|
|
type RootRuntimeStats struct {
|
|
basic *BasicRuntimeStats
|
|
groupRss []RuntimeStats
|
|
}
|
|
|
|
// NewRootRuntimeStats returns a new RootRuntimeStats
|
|
func NewRootRuntimeStats() *RootRuntimeStats {
|
|
return &RootRuntimeStats{}
|
|
}
|
|
|
|
// GetActRows return total rows of RootRuntimeStats.
|
|
func (e *RootRuntimeStats) GetActRows() int64 {
|
|
if e.basic == nil {
|
|
return 0
|
|
}
|
|
return e.basic.rows.Load()
|
|
}
|
|
|
|
// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly.
|
|
func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) {
|
|
return e.basic, e.groupRss
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *RootRuntimeStats) String() string {
|
|
basic, groups := e.MergeStats()
|
|
strs := make([]string, 0, len(groups)+1)
|
|
if basic != nil {
|
|
strs = append(strs, basic.String())
|
|
}
|
|
for _, group := range groups {
|
|
str := group.String()
|
|
if len(str) > 0 {
|
|
strs = append(strs, str)
|
|
}
|
|
}
|
|
return strings.Join(strs, ", ")
|
|
}
|
|
|
|
// Record records executor's execution.
|
|
func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) {
|
|
e.loop.Add(1)
|
|
e.consume.Add(int64(d))
|
|
e.rows.Add(int64(rowNum))
|
|
}
|
|
|
|
// RecordOpen records executor's open time.
|
|
func (e *BasicRuntimeStats) RecordOpen(d time.Duration) {
|
|
e.consume.Add(int64(d))
|
|
e.open.Add(int64(d))
|
|
}
|
|
|
|
// RecordClose records executor's close time.
|
|
func (e *BasicRuntimeStats) RecordClose(d time.Duration) {
|
|
e.consume.Add(int64(d))
|
|
e.close.Add(int64(d))
|
|
}
|
|
|
|
// SetRowNum sets the row num.
|
|
func (e *BasicRuntimeStats) SetRowNum(rowNum int64) {
|
|
e.rows.Store(rowNum)
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *BasicRuntimeStats) String() string {
|
|
if e == nil {
|
|
return ""
|
|
}
|
|
var str strings.Builder
|
|
timePrefix := ""
|
|
if e.executorCount.Load() > 1 {
|
|
timePrefix = "total_"
|
|
}
|
|
totalTime := e.consume.Load()
|
|
openTime := e.open.Load()
|
|
closeTime := e.close.Load()
|
|
str.WriteString(timePrefix)
|
|
str.WriteString("time:")
|
|
str.WriteString(FormatDuration(time.Duration(totalTime)))
|
|
str.WriteString(", ")
|
|
str.WriteString(timePrefix)
|
|
str.WriteString("open:")
|
|
str.WriteString(FormatDuration(time.Duration(openTime)))
|
|
str.WriteString(", ")
|
|
str.WriteString(timePrefix)
|
|
str.WriteString("close:")
|
|
str.WriteString(FormatDuration(time.Duration(closeTime)))
|
|
str.WriteString(", loops:")
|
|
str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10))
|
|
return str.String()
|
|
}
|
|
|
|
// GetTime get the int64 total time
|
|
func (e *BasicRuntimeStats) GetTime() int64 {
|
|
return e.consume.Load()
|
|
}
|
|
|
|
// RuntimeStatsColl collects executors's execution info.
|
|
type RuntimeStatsColl struct {
|
|
rootStats map[int]*RootRuntimeStats
|
|
copStats map[int]*CopRuntimeStats
|
|
stmtCopStats StmtCopRuntimeStats
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// NewRuntimeStatsColl creates new executor collector.
|
|
// Reuse the object to reduce allocation when *RuntimeStatsColl is not nil.
|
|
func NewRuntimeStatsColl(reuse *RuntimeStatsColl) *RuntimeStatsColl {
|
|
if reuse != nil {
|
|
// Reuse map is cheaper than create a new map object.
|
|
// Go compiler optimize this cleanup code pattern to a clearmap() function.
|
|
reuse.mu.Lock()
|
|
defer reuse.mu.Unlock()
|
|
for k := range reuse.rootStats {
|
|
delete(reuse.rootStats, k)
|
|
}
|
|
for k := range reuse.copStats {
|
|
delete(reuse.copStats, k)
|
|
}
|
|
return reuse
|
|
}
|
|
return &RuntimeStatsColl{
|
|
rootStats: make(map[int]*RootRuntimeStats),
|
|
copStats: make(map[int]*CopRuntimeStats),
|
|
}
|
|
}
|
|
|
|
// RegisterStats register execStat for a executor.
|
|
func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
stats, ok := e.rootStats[planID]
|
|
if !ok {
|
|
stats = NewRootRuntimeStats()
|
|
e.rootStats[planID] = stats
|
|
}
|
|
tp := info.Tp()
|
|
found := false
|
|
for _, rss := range stats.groupRss {
|
|
if rss.Tp() == tp {
|
|
rss.Merge(info)
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
stats.groupRss = append(stats.groupRss, info)
|
|
}
|
|
}
|
|
|
|
// GetBasicRuntimeStats gets basicRuntimeStats for a executor
|
|
// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument:
|
|
// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount
|
|
// 2. Else, it returns nil
|
|
func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
stats, ok := e.rootStats[planID]
|
|
if !ok && initNewExecutorStats {
|
|
stats = NewRootRuntimeStats()
|
|
e.rootStats[planID] = stats
|
|
}
|
|
if stats == nil {
|
|
return nil
|
|
}
|
|
|
|
if stats.basic == nil && initNewExecutorStats {
|
|
stats.basic = &BasicRuntimeStats{}
|
|
stats.basic.executorCount.Add(1)
|
|
} else if stats.basic != nil && initNewExecutorStats {
|
|
stats.basic.executorCount.Add(1)
|
|
}
|
|
return stats.basic
|
|
}
|
|
|
|
// GetStmtCopRuntimeStats gets execStat for a executor.
|
|
func (e *RuntimeStatsColl) GetStmtCopRuntimeStats() StmtCopRuntimeStats {
|
|
return e.stmtCopStats
|
|
}
|
|
|
|
// GetRootStats gets execStat for a executor.
|
|
func (e *RuntimeStatsColl) GetRootStats(planID int) *RootRuntimeStats {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
runtimeStats, exists := e.rootStats[planID]
|
|
if !exists {
|
|
runtimeStats = NewRootRuntimeStats()
|
|
e.rootStats[planID] = runtimeStats
|
|
}
|
|
return runtimeStats
|
|
}
|
|
|
|
// GetPlanActRows returns the actual rows of the plan.
|
|
func (e *RuntimeStatsColl) GetPlanActRows(planID int) int64 {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
runtimeStats, exists := e.rootStats[planID]
|
|
if !exists {
|
|
return 0
|
|
}
|
|
return runtimeStats.GetActRows()
|
|
}
|
|
|
|
// GetCopStats gets the CopRuntimeStats specified by planID.
|
|
func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
copStats, ok := e.copStats[planID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return copStats
|
|
}
|
|
|
|
// GetCopCountAndRows returns the total cop-tasks count and total rows of all cop-tasks.
|
|
func (e *RuntimeStatsColl) GetCopCountAndRows(planID int) (int32, int64) {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
copStats, ok := e.copStats[planID]
|
|
if !ok {
|
|
return 0, 0
|
|
}
|
|
return copStats.GetTasks(), copStats.GetActRows()
|
|
}
|
|
|
|
func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, bool) {
|
|
if summary.GetExecutorId() != "" {
|
|
strs := strings.Split(summary.GetExecutorId(), "_")
|
|
if id, err := strconv.Atoi(strs[len(strs)-1]); err == nil {
|
|
return id, true
|
|
}
|
|
}
|
|
return 0, false
|
|
}
|
|
|
|
// RecordCopStats records a specific cop tasks's execution detail.
|
|
func (e *RuntimeStatsColl) RecordCopStats(planID int, storeType kv.StoreType, scan *util.ScanDetail, time util.TimeDetail, summary *tipb.ExecutorExecutionSummary) int {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
copStats, ok := e.copStats[planID]
|
|
if !ok {
|
|
copStats = &CopRuntimeStats{
|
|
timeDetail: time,
|
|
storeType: storeType,
|
|
}
|
|
if scan != nil {
|
|
copStats.scanDetail = *scan
|
|
}
|
|
e.copStats[planID] = copStats
|
|
} else {
|
|
if scan != nil {
|
|
copStats.scanDetail.Merge(scan)
|
|
}
|
|
copStats.timeDetail.Merge(&time)
|
|
}
|
|
if summary != nil {
|
|
// for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in
|
|
// summary, use it overwrite the planID
|
|
id, valid := getPlanIDFromExecutionSummary(summary)
|
|
if valid && id != planID {
|
|
planID = id
|
|
copStats, ok = e.copStats[planID]
|
|
if !ok {
|
|
copStats = &CopRuntimeStats{
|
|
storeType: storeType,
|
|
}
|
|
e.copStats[planID] = copStats
|
|
}
|
|
}
|
|
copStats.stats.mergeExecSummary(summary)
|
|
e.stmtCopStats.mergeExecSummary(summary)
|
|
}
|
|
return planID
|
|
}
|
|
|
|
// RecordOneCopTask records a specific cop tasks's execution summary.
|
|
func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, summary *tipb.ExecutorExecutionSummary) int {
|
|
// for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in
|
|
// summary, use it overwrite the planID
|
|
if id, valid := getPlanIDFromExecutionSummary(summary); valid {
|
|
planID = id
|
|
}
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
copStats, ok := e.copStats[planID]
|
|
if !ok {
|
|
copStats = &CopRuntimeStats{
|
|
storeType: storeType,
|
|
}
|
|
e.copStats[planID] = copStats
|
|
}
|
|
copStats.stats.mergeExecSummary(summary)
|
|
e.stmtCopStats.mergeExecSummary(summary)
|
|
return planID
|
|
}
|
|
|
|
// ExistsRootStats checks if the planID exists in the rootStats collection.
|
|
func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
_, exists := e.rootStats[planID]
|
|
return exists
|
|
}
|
|
|
|
// ExistsCopStats checks if the planID exists in the copStats collection.
|
|
func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
_, exists := e.copStats[planID]
|
|
return exists
|
|
}
|
|
|
|
// ConcurrencyInfo is used to save the concurrency information of the executor operator
|
|
type ConcurrencyInfo struct {
|
|
concurrencyName string
|
|
concurrencyNum int
|
|
}
|
|
|
|
// NewConcurrencyInfo creates new executor's concurrencyInfo.
|
|
func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo {
|
|
return &ConcurrencyInfo{name, num}
|
|
}
|
|
|
|
// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo.
|
|
type RuntimeStatsWithConcurrencyInfo struct {
|
|
// executor concurrency information
|
|
concurrency []*ConcurrencyInfo
|
|
// protect concurrency
|
|
sync.Mutex
|
|
}
|
|
|
|
// Tp implements the RuntimeStats interface.
|
|
func (*RuntimeStatsWithConcurrencyInfo) Tp() int {
|
|
return TpRuntimeStatsWithConcurrencyInfo
|
|
}
|
|
|
|
// SetConcurrencyInfo sets the concurrency informations.
|
|
// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo.
|
|
// When the num <= 0, it means the exector operator is not executed parallel.
|
|
func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*ConcurrencyInfo) {
|
|
e.Lock()
|
|
defer e.Unlock()
|
|
e.concurrency = e.concurrency[:0]
|
|
e.concurrency = append(e.concurrency, infos...)
|
|
}
|
|
|
|
// Clone implements the RuntimeStats interface.
|
|
func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats {
|
|
newRs := &RuntimeStatsWithConcurrencyInfo{
|
|
concurrency: make([]*ConcurrencyInfo, 0, len(e.concurrency)),
|
|
}
|
|
newRs.concurrency = append(newRs.concurrency, e.concurrency...)
|
|
return newRs
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *RuntimeStatsWithConcurrencyInfo) String() string {
|
|
buf := bytes.NewBuffer(make([]byte, 0, 8))
|
|
if len(e.concurrency) > 0 {
|
|
for i, concurrency := range e.concurrency {
|
|
if i > 0 {
|
|
buf.WriteString(", ")
|
|
}
|
|
if concurrency.concurrencyNum > 0 {
|
|
buf.WriteString(concurrency.concurrencyName)
|
|
buf.WriteByte(':')
|
|
buf.WriteString(strconv.Itoa(concurrency.concurrencyNum))
|
|
} else {
|
|
buf.WriteString(concurrency.concurrencyName)
|
|
buf.WriteString(":OFF")
|
|
}
|
|
}
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
// Merge implements the RuntimeStats interface.
|
|
func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {}
|
|
|
|
// RuntimeStatsWithCommit is the RuntimeStats with commit detail.
|
|
type RuntimeStatsWithCommit struct {
|
|
Commit *util.CommitDetails
|
|
LockKeys *util.LockKeysDetails
|
|
TxnCnt int
|
|
}
|
|
|
|
// Tp implements the RuntimeStats interface.
|
|
func (*RuntimeStatsWithCommit) Tp() int {
|
|
return TpRuntimeStatsWithCommit
|
|
}
|
|
|
|
// MergeCommitDetails merges the commit details.
|
|
func (e *RuntimeStatsWithCommit) MergeCommitDetails(detail *util.CommitDetails) {
|
|
if detail == nil {
|
|
return
|
|
}
|
|
if e.Commit == nil {
|
|
e.Commit = detail
|
|
e.TxnCnt = 1
|
|
return
|
|
}
|
|
e.Commit.Merge(detail)
|
|
e.TxnCnt++
|
|
}
|
|
|
|
// Merge implements the RuntimeStats interface.
|
|
func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) {
|
|
tmp, ok := rs.(*RuntimeStatsWithCommit)
|
|
if !ok {
|
|
return
|
|
}
|
|
e.TxnCnt += tmp.TxnCnt
|
|
if tmp.Commit != nil {
|
|
if e.Commit == nil {
|
|
e.Commit = &util.CommitDetails{}
|
|
}
|
|
e.Commit.Merge(tmp.Commit)
|
|
}
|
|
|
|
if tmp.LockKeys != nil {
|
|
if e.LockKeys == nil {
|
|
e.LockKeys = &util.LockKeysDetails{}
|
|
}
|
|
e.LockKeys.Merge(tmp.LockKeys)
|
|
}
|
|
}
|
|
|
|
// Clone implements the RuntimeStats interface.
|
|
func (e *RuntimeStatsWithCommit) Clone() RuntimeStats {
|
|
newRs := RuntimeStatsWithCommit{
|
|
TxnCnt: e.TxnCnt,
|
|
}
|
|
if e.Commit != nil {
|
|
newRs.Commit = e.Commit.Clone()
|
|
}
|
|
if e.LockKeys != nil {
|
|
newRs.LockKeys = e.LockKeys.Clone()
|
|
}
|
|
return &newRs
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *RuntimeStatsWithCommit) String() string {
|
|
buf := bytes.NewBuffer(make([]byte, 0, 32))
|
|
if e.Commit != nil {
|
|
buf.WriteString("commit_txn: {")
|
|
// Only print out when there are more than 1 transaction.
|
|
if e.TxnCnt > 1 {
|
|
buf.WriteString("count: ")
|
|
buf.WriteString(strconv.Itoa(e.TxnCnt))
|
|
buf.WriteString(", ")
|
|
}
|
|
if e.Commit.PrewriteTime > 0 {
|
|
buf.WriteString("prewrite:")
|
|
buf.WriteString(FormatDuration(e.Commit.PrewriteTime))
|
|
}
|
|
if e.Commit.WaitPrewriteBinlogTime > 0 {
|
|
buf.WriteString(", wait_prewrite_binlog:")
|
|
buf.WriteString(FormatDuration(e.Commit.WaitPrewriteBinlogTime))
|
|
}
|
|
if e.Commit.GetCommitTsTime > 0 {
|
|
buf.WriteString(", get_commit_ts:")
|
|
buf.WriteString(FormatDuration(e.Commit.GetCommitTsTime))
|
|
}
|
|
if e.Commit.CommitTime > 0 {
|
|
buf.WriteString(", commit:")
|
|
buf.WriteString(FormatDuration(e.Commit.CommitTime))
|
|
}
|
|
e.Commit.Mu.Lock()
|
|
commitBackoffTime := e.Commit.Mu.CommitBackoffTime
|
|
if commitBackoffTime > 0 {
|
|
buf.WriteString(", backoff: {time: ")
|
|
buf.WriteString(FormatDuration(time.Duration(commitBackoffTime)))
|
|
if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 {
|
|
buf.WriteString(", prewrite type: ")
|
|
e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes)
|
|
}
|
|
if len(e.Commit.Mu.CommitBackoffTypes) > 0 {
|
|
buf.WriteString(", commit type: ")
|
|
e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes)
|
|
}
|
|
buf.WriteString("}")
|
|
}
|
|
if e.Commit.Mu.SlowestPrewrite.ReqTotalTime > 0 {
|
|
buf.WriteString(", slowest_prewrite_rpc: {total: ")
|
|
buf.WriteString(strconv.FormatFloat(e.Commit.Mu.SlowestPrewrite.ReqTotalTime.Seconds(), 'f', 3, 64))
|
|
buf.WriteString("s, region_id: ")
|
|
buf.WriteString(strconv.FormatUint(e.Commit.Mu.SlowestPrewrite.Region, 10))
|
|
buf.WriteString(", store: ")
|
|
buf.WriteString(e.Commit.Mu.SlowestPrewrite.StoreAddr)
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.Commit.Mu.SlowestPrewrite.ExecDetails.String())
|
|
buf.WriteString("}")
|
|
}
|
|
if e.Commit.Mu.CommitPrimary.ReqTotalTime > 0 {
|
|
buf.WriteString(", commit_primary_rpc: {total: ")
|
|
buf.WriteString(strconv.FormatFloat(e.Commit.Mu.CommitPrimary.ReqTotalTime.Seconds(), 'f', 3, 64))
|
|
buf.WriteString("s, region_id: ")
|
|
buf.WriteString(strconv.FormatUint(e.Commit.Mu.CommitPrimary.Region, 10))
|
|
buf.WriteString(", store: ")
|
|
buf.WriteString(e.Commit.Mu.CommitPrimary.StoreAddr)
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.Commit.Mu.CommitPrimary.ExecDetails.String())
|
|
buf.WriteString("}")
|
|
}
|
|
e.Commit.Mu.Unlock()
|
|
if e.Commit.ResolveLock.ResolveLockTime > 0 {
|
|
buf.WriteString(", resolve_lock: ")
|
|
buf.WriteString(FormatDuration(time.Duration(e.Commit.ResolveLock.ResolveLockTime)))
|
|
}
|
|
|
|
prewriteRegionNum := atomic.LoadInt32(&e.Commit.PrewriteRegionNum)
|
|
if prewriteRegionNum > 0 {
|
|
buf.WriteString(", region_num:")
|
|
buf.WriteString(strconv.FormatInt(int64(prewriteRegionNum), 10))
|
|
}
|
|
if e.Commit.WriteKeys > 0 {
|
|
buf.WriteString(", write_keys:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteKeys), 10))
|
|
}
|
|
if e.Commit.WriteSize > 0 {
|
|
buf.WriteString(", write_byte:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteSize), 10))
|
|
}
|
|
if e.Commit.TxnRetry > 0 {
|
|
buf.WriteString(", txn_retry:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.Commit.TxnRetry), 10))
|
|
}
|
|
buf.WriteString("}")
|
|
}
|
|
if e.LockKeys != nil {
|
|
if buf.Len() > 0 {
|
|
buf.WriteString(", ")
|
|
}
|
|
buf.WriteString("lock_keys: {")
|
|
if e.LockKeys.TotalTime > 0 {
|
|
buf.WriteString("time:")
|
|
buf.WriteString(FormatDuration(e.LockKeys.TotalTime))
|
|
}
|
|
if e.LockKeys.RegionNum > 0 {
|
|
buf.WriteString(", region:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RegionNum), 10))
|
|
}
|
|
if e.LockKeys.LockKeys > 0 {
|
|
buf.WriteString(", keys:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.LockKeys), 10))
|
|
}
|
|
if e.LockKeys.ResolveLock.ResolveLockTime > 0 {
|
|
buf.WriteString(", resolve_lock:")
|
|
buf.WriteString(FormatDuration(time.Duration(e.LockKeys.ResolveLock.ResolveLockTime)))
|
|
}
|
|
e.LockKeys.Mu.Lock()
|
|
if e.LockKeys.BackoffTime > 0 {
|
|
buf.WriteString(", backoff: {time: ")
|
|
buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime)))
|
|
if len(e.LockKeys.Mu.BackoffTypes) > 0 {
|
|
buf.WriteString(", type: ")
|
|
e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes)
|
|
}
|
|
buf.WriteString("}")
|
|
}
|
|
if e.LockKeys.Mu.SlowestReqTotalTime > 0 {
|
|
buf.WriteString(", slowest_rpc: {total: ")
|
|
buf.WriteString(strconv.FormatFloat(e.LockKeys.Mu.SlowestReqTotalTime.Seconds(), 'f', 3, 64))
|
|
buf.WriteString("s, region_id: ")
|
|
buf.WriteString(strconv.FormatUint(e.LockKeys.Mu.SlowestRegion, 10))
|
|
buf.WriteString(", store: ")
|
|
buf.WriteString(e.LockKeys.Mu.SlowestStoreAddr)
|
|
buf.WriteString(", ")
|
|
buf.WriteString(e.LockKeys.Mu.SlowestExecDetails.String())
|
|
buf.WriteString("}")
|
|
}
|
|
e.LockKeys.Mu.Unlock()
|
|
if e.LockKeys.LockRPCTime > 0 {
|
|
buf.WriteString(", lock_rpc:")
|
|
buf.WriteString(time.Duration(e.LockKeys.LockRPCTime).String())
|
|
}
|
|
if e.LockKeys.LockRPCCount > 0 {
|
|
buf.WriteString(", rpc_count:")
|
|
buf.WriteString(strconv.FormatInt(e.LockKeys.LockRPCCount, 10))
|
|
}
|
|
if e.LockKeys.RetryCount > 0 {
|
|
buf.WriteString(", retry_count:")
|
|
buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RetryCount), 10))
|
|
}
|
|
|
|
buf.WriteString("}")
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) {
|
|
if len(backoffTypes) == 0 {
|
|
return
|
|
}
|
|
tpMap := make(map[string]struct{})
|
|
tpArray := []string{}
|
|
for _, tpStr := range backoffTypes {
|
|
_, ok := tpMap[tpStr]
|
|
if ok {
|
|
continue
|
|
}
|
|
tpMap[tpStr] = struct{}{}
|
|
tpArray = append(tpArray, tpStr)
|
|
}
|
|
slices.Sort(tpArray)
|
|
buf.WriteByte('[')
|
|
for i, tp := range tpArray {
|
|
if i > 0 {
|
|
buf.WriteString(" ")
|
|
}
|
|
buf.WriteString(tp)
|
|
}
|
|
buf.WriteByte(']')
|
|
}
|
|
|
|
// RURuntimeStats is a wrapper of util.RUDetails,
|
|
// which implements the RuntimeStats interface.
|
|
type RURuntimeStats struct {
|
|
*util.RUDetails
|
|
}
|
|
|
|
// String implements the RuntimeStats interface.
|
|
func (e *RURuntimeStats) String() string {
|
|
if e.RUDetails != nil {
|
|
buf := bytes.NewBuffer(make([]byte, 0, 8))
|
|
buf.WriteString("RU:")
|
|
buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64))
|
|
return buf.String()
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Clone implements the RuntimeStats interface.
|
|
func (e *RURuntimeStats) Clone() RuntimeStats {
|
|
return &RURuntimeStats{RUDetails: e.RUDetails.Clone()}
|
|
}
|
|
|
|
// Merge implements the RuntimeStats interface.
|
|
func (e *RURuntimeStats) Merge(other RuntimeStats) {
|
|
if tmp, ok := other.(*RURuntimeStats); ok {
|
|
if e.RUDetails != nil {
|
|
e.RUDetails.Merge(tmp.RUDetails)
|
|
} else {
|
|
e.RUDetails = tmp.RUDetails.Clone()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Tp implements the RuntimeStats interface.
|
|
func (*RURuntimeStats) Tp() int {
|
|
return TpRURuntimeStats
|
|
}
|