From 5f624c01d8a38bb91b65a07275380fb754700a0f Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 23 Jul 2025 19:56:11 +0800 Subject: [PATCH] execdetails: move logic from execdetails.go to runtime_stats.go, tiflash_stats.go, and util.go by feature (#62581) close pingcap/tidb#62580 --- pkg/util/execdetails/BUILD.bazel | 2 + pkg/util/execdetails/execdetails.go | 1852 ------------------------- pkg/util/execdetails/runtime_stats.go | 999 +++++++++++++ pkg/util/execdetails/tiflash_stats.go | 742 ++++++++++ pkg/util/execdetails/util.go | 165 +++ 5 files changed, 1908 insertions(+), 1852 deletions(-) create mode 100644 pkg/util/execdetails/runtime_stats.go create mode 100644 pkg/util/execdetails/tiflash_stats.go diff --git a/pkg/util/execdetails/BUILD.bazel b/pkg/util/execdetails/BUILD.bazel index af276b160c..a9b1a05f12 100644 --- a/pkg/util/execdetails/BUILD.bazel +++ b/pkg/util/execdetails/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "execdetails", srcs = [ "execdetails.go", + "runtime_stats.go", + "tiflash_stats.go", "util.go", ], importpath = "github.com/pingcap/tidb/pkg/util/execdetails", diff --git a/pkg/util/execdetails/execdetails.go b/pkg/util/execdetails/execdetails.go index 27dbc1b791..6868841a62 100644 --- a/pkg/util/execdetails/execdetails.go +++ b/pkg/util/execdetails/execdetails.go @@ -15,22 +15,13 @@ package execdetails import ( - "bytes" - "cmp" "fmt" - "maps" - "math" - "slices" "strconv" "strings" "sync" "sync/atomic" "time" - "github.com/influxdata/tdigest" - "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" ) @@ -623,1846 +614,3 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) { fields = append(fields, zap.String("wait_max_addr", d.WaitTimeStats.MaxAddress)) return fields } - -type basicCopRuntimeStats struct { - loop int32 - rows int64 - threads int32 - procTimes Percentile[Duration] - // executor extra infos - tiflashStats *TiflashStats -} - -// TiflashStats contains tiflash execution stats. -type TiflashStats struct { - scanContext TiFlashScanContext - waitSummary TiFlashWaitSummary - networkSummary TiFlashNetworkTrafficSummary -} - -type canGetFloat64 interface { - GetFloat64() float64 -} - -// Int64 is a wrapper of int64 to implement the canGetFloat64 interface. -type Int64 int64 - -// GetFloat64 implements the canGetFloat64 interface. -func (i Int64) GetFloat64() float64 { return float64(i) } - -// Duration is a wrapper of time.Duration to implement the canGetFloat64 interface. -type Duration time.Duration - -// GetFloat64 implements the canGetFloat64 interface. -func (d Duration) GetFloat64() float64 { return float64(d) } - -// DurationWithAddr is a wrapper of time.Duration and string to implement the canGetFloat64 interface. -type DurationWithAddr struct { - D time.Duration - Addr string -} - -// GetFloat64 implements the canGetFloat64 interface. -func (d DurationWithAddr) GetFloat64() float64 { return float64(d.D) } - -// Percentile is a struct to calculate the percentile of a series of values. -type Percentile[valueType canGetFloat64] struct { - values []valueType - size int - isSorted bool - - minVal valueType - maxVal valueType - sumVal float64 - dt *tdigest.TDigest -} - -// Add adds a value to calculate the percentile. -func (p *Percentile[valueType]) Add(value valueType) { - p.isSorted = false - p.sumVal += value.GetFloat64() - p.size++ - if p.dt == nil && len(p.values) == 0 { - p.minVal = value - p.maxVal = value - } else { - if value.GetFloat64() < p.minVal.GetFloat64() { - p.minVal = value - } - if value.GetFloat64() > p.maxVal.GetFloat64() { - p.maxVal = value - } - } - if p.dt == nil { - p.values = append(p.values, value) - if len(p.values) >= MaxDetailsNumsForOneQuery { - p.dt = tdigest.New() - for _, v := range p.values { - p.dt.Add(v.GetFloat64(), 1) - } - p.values = nil - } - return - } - p.dt.Add(value.GetFloat64(), 1) -} - -// GetPercentile returns the percentile `f` of the values. -func (p *Percentile[valueType]) GetPercentile(f float64) float64 { - if p.dt == nil { - if !p.isSorted { - p.isSorted = true - slices.SortFunc(p.values, func(i, j valueType) int { - return cmp.Compare(i.GetFloat64(), j.GetFloat64()) - }) - } - return p.values[int(float64(len(p.values))*f)].GetFloat64() - } - return p.dt.Quantile(f) -} - -// GetMax returns the max value. -func (p *Percentile[valueType]) GetMax() valueType { - return p.maxVal -} - -// GetMin returns the min value. -func (p *Percentile[valueType]) GetMin() valueType { - return p.minVal -} - -// MergePercentile merges two Percentile. -func (p *Percentile[valueType]) MergePercentile(p2 *Percentile[valueType]) { - p.isSorted = false - if p2.dt == nil { - for _, v := range p2.values { - p.Add(v) - } - return - } - p.sumVal += p2.sumVal - p.size += p2.size - if p.dt == nil { - p.dt = tdigest.New() - for _, v := range p.values { - p.dt.Add(v.GetFloat64(), 1) - } - p.values = nil - } - p.dt.AddCentroidList(p2.dt.Centroids()) -} - -// Size returns the size of the values. -func (p *Percentile[valueType]) Size() int { - return p.size -} - -// Sum returns the sum of the values. -func (p *Percentile[valueType]) Sum() float64 { - return p.sumVal -} - -// String implements the RuntimeStats interface. -func (e *basicCopRuntimeStats) String() string { - buf := bytes.NewBuffer(make([]byte, 0, 16)) - buf.WriteString("time:") - buf.WriteString(FormatDuration(time.Duration(e.procTimes.sumVal))) - buf.WriteString(", loops:") - buf.WriteString(strconv.Itoa(int(e.loop))) - if e.tiflashStats != nil { - buf.WriteString(", threads:") - buf.WriteString(strconv.Itoa(int(e.threads))) - if !e.tiflashStats.waitSummary.CanBeIgnored() { - buf.WriteString(", ") - buf.WriteString(e.tiflashStats.waitSummary.String()) - } - if !e.tiflashStats.networkSummary.Empty() { - buf.WriteString(", ") - buf.WriteString(e.tiflashStats.networkSummary.String()) - } - buf.WriteString(", ") - buf.WriteString(e.tiflashStats.scanContext.String()) - } - return buf.String() -} - -// Clone implements the RuntimeStats interface. -func (e *basicCopRuntimeStats) Clone() RuntimeStats { - stats := &basicCopRuntimeStats{ - loop: e.loop, - rows: e.rows, - threads: e.threads, - procTimes: e.procTimes, - } - if e.tiflashStats != nil { - stats.tiflashStats = &TiflashStats{ - scanContext: e.tiflashStats.scanContext.Clone(), - waitSummary: e.tiflashStats.waitSummary.Clone(), - networkSummary: e.tiflashStats.networkSummary.Clone(), - } - } - return stats -} - -// Merge implements the RuntimeStats interface. -func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) { - tmp, ok := rs.(*basicCopRuntimeStats) - if !ok { - return - } - e.loop += tmp.loop - e.rows += tmp.rows - e.threads += tmp.threads - if tmp.procTimes.Size() > 0 { - e.procTimes.MergePercentile(&tmp.procTimes) - } - if tmp.tiflashStats != nil { - if e.tiflashStats == nil { - e.tiflashStats = &TiflashStats{} - } - e.tiflashStats.scanContext.Merge(tmp.tiflashStats.scanContext) - e.tiflashStats.waitSummary.Merge(tmp.tiflashStats.waitSummary) - e.tiflashStats.networkSummary.Merge(tmp.tiflashStats.networkSummary) - } -} - -// mergeExecSummary likes Merge, but it merges ExecutorExecutionSummary directly. -func (e *basicCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) { - e.loop += (int32(*summary.NumIterations)) - e.rows += (int64(*summary.NumProducedRows)) - e.threads += int32(summary.GetConcurrency()) - e.procTimes.Add(Duration(int64(*summary.TimeProcessedNs))) - if tiflashScanContext := summary.GetTiflashScanContext(); tiflashScanContext != nil { - if e.tiflashStats == nil { - e.tiflashStats = &TiflashStats{} - } - e.tiflashStats.scanContext.mergeExecSummary(tiflashScanContext) - } - if tiflashWaitSummary := summary.GetTiflashWaitSummary(); tiflashWaitSummary != nil { - if e.tiflashStats == nil { - e.tiflashStats = &TiflashStats{} - } - e.tiflashStats.waitSummary.mergeExecSummary(tiflashWaitSummary, *summary.TimeProcessedNs) - } - if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil { - if e.tiflashStats == nil { - e.tiflashStats = &TiflashStats{} - } - e.tiflashStats.networkSummary.mergeExecSummary(tiflashNetworkSummary) - } -} - -// Tp implements the RuntimeStats interface. -func (*basicCopRuntimeStats) Tp() int { - return TpBasicCopRunTimeStats -} - -// StmtCopRuntimeStats stores the cop runtime stats of the total statement -type StmtCopRuntimeStats struct { - // TiflashNetworkStats stats all mpp tasks' network traffic info, nil if no any mpp tasks' network traffic - TiflashNetworkStats *TiFlashNetworkTrafficSummary -} - -// mergeExecSummary merges ExecutorExecutionSummary into stmt cop runtime stats directly. -func (e *StmtCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) { - if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil { - if e.TiflashNetworkStats == nil { - e.TiflashNetworkStats = &TiFlashNetworkTrafficSummary{} - } - e.TiflashNetworkStats.mergeExecSummary(tiflashNetworkSummary) - } -} - -// CopRuntimeStats collects cop tasks' execution info. -type CopRuntimeStats struct { - // stats stores the runtime statistics of coprocessor tasks. - // The key of the map is the tikv-server address. Because a tikv-server can - // have many region leaders, several coprocessor tasks can be sent to the - // same tikv-server instance. We have to use a list to maintain all tasks - // executed on each instance. - stats basicCopRuntimeStats - scanDetail util.ScanDetail - timeDetail util.TimeDetail - storeType kv.StoreType -} - -// GetActRows return total rows of CopRuntimeStats. -func (crs *CopRuntimeStats) GetActRows() int64 { - return crs.stats.rows -} - -// GetTasks return total tasks of CopRuntimeStats -func (crs *CopRuntimeStats) GetTasks() int32 { - return int32(crs.stats.procTimes.size) -} - -var zeroTimeDetail = util.TimeDetail{} - -func (crs *CopRuntimeStats) String() string { - procTimes := crs.stats.procTimes - totalTasks := procTimes.size - isTiFlashCop := crs.storeType == kv.TiFlash - buf := bytes.NewBuffer(make([]byte, 0, 16)) - { - printTiFlashSpecificInfo := func() { - if isTiFlashCop { - buf.WriteString(", ") - buf.WriteString("threads:") - buf.WriteString(strconv.Itoa(int(crs.stats.threads))) - buf.WriteString("}") - if crs.stats.tiflashStats != nil { - if !crs.stats.tiflashStats.waitSummary.CanBeIgnored() { - buf.WriteString(", ") - buf.WriteString(crs.stats.tiflashStats.waitSummary.String()) - } - if !crs.stats.tiflashStats.networkSummary.Empty() { - buf.WriteString(", ") - buf.WriteString(crs.stats.tiflashStats.networkSummary.String()) - } - if !crs.stats.tiflashStats.scanContext.Empty() { - buf.WriteString(", ") - buf.WriteString(crs.stats.tiflashStats.scanContext.String()) - } - } - } else { - buf.WriteString("}") - } - } - if totalTasks == 1 { - buf.WriteString(crs.storeType.Name()) - buf.WriteString("_task:{time:") - buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0)))) - buf.WriteString(", loops:") - buf.WriteString(strconv.Itoa(int(crs.stats.loop))) - printTiFlashSpecificInfo() - } else if totalTasks > 0 { - buf.WriteString(crs.storeType.Name()) - buf.WriteString("_task:{proc max:") - buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64()))) - buf.WriteString(", min:") - buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64()))) - buf.WriteString(", avg: ") - buf.WriteString(FormatDuration(time.Duration(int64(procTimes.Sum()) / int64(totalTasks)))) - buf.WriteString(", p80:") - buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8)))) - buf.WriteString(", p95:") - buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95)))) - buf.WriteString(", iters:") - buf.WriteString(strconv.Itoa(int(crs.stats.loop))) - buf.WriteString(", tasks:") - buf.WriteString(strconv.Itoa(totalTasks)) - printTiFlashSpecificInfo() - } - } - if !isTiFlashCop { - detail := crs.scanDetail.String() - if detail != "" { - buf.WriteString(", ") - buf.WriteString(detail) - } - if crs.timeDetail != zeroTimeDetail { - timeDetailStr := crs.timeDetail.String() - if timeDetailStr != "" { - buf.WriteString(", ") - buf.WriteString(timeDetailStr) - } - } - } - return buf.String() -} - -const ( - // TpBasicRuntimeStats is the tp for BasicRuntimeStats. - TpBasicRuntimeStats int = iota - // TpRuntimeStatsWithCommit is the tp for RuntimeStatsWithCommit. - TpRuntimeStatsWithCommit - // TpRuntimeStatsWithConcurrencyInfo is the tp for RuntimeStatsWithConcurrencyInfo. - TpRuntimeStatsWithConcurrencyInfo - // TpSnapshotRuntimeStats is the tp for SnapshotRuntimeStats. - TpSnapshotRuntimeStats - // TpHashJoinRuntimeStats is the tp for HashJoinRuntimeStats. - TpHashJoinRuntimeStats - // TpHashJoinRuntimeStatsV2 is the tp for hashJoinRuntimeStatsV2. - TpHashJoinRuntimeStatsV2 - // TpIndexLookUpJoinRuntimeStats is the tp for IndexLookUpJoinRuntimeStats. - TpIndexLookUpJoinRuntimeStats - // TpRuntimeStatsWithSnapshot is the tp for RuntimeStatsWithSnapshot. - TpRuntimeStatsWithSnapshot - // TpJoinRuntimeStats is the tp for JoinRuntimeStats. - TpJoinRuntimeStats - // TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats. - TpSelectResultRuntimeStats - // TpInsertRuntimeStat is the tp for InsertRuntimeStat - TpInsertRuntimeStat - // TpIndexLookUpRunTimeStats is the tp for IndexLookUpRunTimeStats - TpIndexLookUpRunTimeStats - // TpSlowQueryRuntimeStat is the tp for SlowQueryRuntimeStat - TpSlowQueryRuntimeStat - // TpHashAggRuntimeStat is the tp for HashAggRuntimeStat - TpHashAggRuntimeStat - // TpIndexMergeRunTimeStats is the tp for IndexMergeRunTimeStats - TpIndexMergeRunTimeStats - // TpBasicCopRunTimeStats is the tp for BasicCopRunTimeStats - TpBasicCopRunTimeStats - // TpUpdateRuntimeStats is the tp for UpdateRuntimeStats - TpUpdateRuntimeStats - // TpFKCheckRuntimeStats is the tp for FKCheckRuntimeStats - TpFKCheckRuntimeStats - // TpFKCascadeRuntimeStats is the tp for FKCascadeRuntimeStats - TpFKCascadeRuntimeStats - // TpRURuntimeStats is the tp for RURuntimeStats - TpRURuntimeStats -) - -// RuntimeStats is used to express the executor runtime information. -type RuntimeStats interface { - String() string - Merge(RuntimeStats) - Clone() RuntimeStats - Tp() int -} - -// TiFlashScanContext is used to express the table scan information in tiflash -type TiFlashScanContext struct { - dmfileDataScannedRows uint64 - dmfileDataSkippedRows uint64 - dmfileMvccScannedRows uint64 - dmfileMvccSkippedRows uint64 - dmfileLmFilterScannedRows uint64 - dmfileLmFilterSkippedRows uint64 - totalDmfileRsCheckMs uint64 - totalDmfileReadMs uint64 - totalBuildSnapshotMs uint64 - localRegions uint64 - remoteRegions uint64 - totalLearnerReadMs uint64 - disaggReadCacheHitBytes uint64 - disaggReadCacheMissBytes uint64 - segments uint64 - readTasks uint64 - deltaRows uint64 - deltaBytes uint64 - mvccInputRows uint64 - mvccInputBytes uint64 - mvccOutputRows uint64 - totalBuildBitmapMs uint64 - totalBuildInputStreamMs uint64 - staleReadRegions uint64 - minLocalStreamMs uint64 - maxLocalStreamMs uint64 - minRemoteStreamMs uint64 - maxRemoteStreamMs uint64 - regionsOfInstance map[string]uint64 - - // vector index related - - vectorIdxLoadFromS3 uint64 - vectorIdxLoadFromDisk uint64 - vectorIdxLoadFromCache uint64 - vectorIdxLoadTimeMs uint64 - vectorIdxSearchTimeMs uint64 - vectorIdxSearchVisitedNodes uint64 - vectorIdxSearchDiscardedNodes uint64 - vectorIdxReadVecTimeMs uint64 - vectorIdxReadOthersTimeMs uint64 - - // fts related - - ftsNFromInmemoryNoindex uint32 - ftsNFromTinyIndex uint32 - ftsNFromTinyNoindex uint32 - ftsNFromDmfIndex uint32 - ftsNFromDmfNoindex uint32 - ftsRowsFromInmemoryNoindex uint64 - ftsRowsFromTinyIndex uint64 - ftsRowsFromTinyNoindex uint64 - ftsRowsFromDmfIndex uint64 - ftsRowsFromDmfNoindex uint64 - ftsIdxLoadTotalMs uint64 - ftsIdxLoadFromCache uint32 - ftsIdxLoadFromColumnFile uint32 - ftsIdxLoadFromStableS3 uint32 - ftsIdxLoadFromStableDisk uint32 - ftsIdxSearchN uint32 - ftsIdxSearchTotalMs uint64 - ftsIdxDmSearchRows uint64 - ftsIdxDmTotalReadFtsMs uint64 - ftsIdxDmTotalReadOthersMs uint64 - ftsIdxTinySearchRows uint64 - ftsIdxTinyTotalReadFtsMs uint64 - ftsIdxTinyTotalReadOthersMs uint64 - ftsBruteTotalReadMs uint64 - ftsBruteTotalSearchMs uint64 - - // inverted index related - - invertedIdxLoadFromS3 uint32 - invertedIdxLoadFromDisk uint32 - invertedIdxLoadFromCache uint32 - invertedIdxLoadTimeMs uint64 - invertedIdxSearchTimeMs uint64 - invertedIdxSearchSkippedPacks uint32 - invertedIdxIndexedRows uint64 - invertedIdxSearchSelectedRows uint64 -} - -// Clone implements the deep copy of * TiFlashshScanContext -func (context *TiFlashScanContext) Clone() TiFlashScanContext { - newContext := TiFlashScanContext{ - dmfileDataScannedRows: context.dmfileDataScannedRows, - dmfileDataSkippedRows: context.dmfileDataSkippedRows, - dmfileMvccScannedRows: context.dmfileMvccScannedRows, - dmfileMvccSkippedRows: context.dmfileMvccSkippedRows, - dmfileLmFilterScannedRows: context.dmfileLmFilterScannedRows, - dmfileLmFilterSkippedRows: context.dmfileLmFilterSkippedRows, - totalDmfileRsCheckMs: context.totalDmfileRsCheckMs, - totalDmfileReadMs: context.totalDmfileReadMs, - totalBuildSnapshotMs: context.totalBuildSnapshotMs, - localRegions: context.localRegions, - remoteRegions: context.remoteRegions, - totalLearnerReadMs: context.totalLearnerReadMs, - disaggReadCacheHitBytes: context.disaggReadCacheHitBytes, - disaggReadCacheMissBytes: context.disaggReadCacheMissBytes, - segments: context.segments, - readTasks: context.readTasks, - deltaRows: context.deltaRows, - deltaBytes: context.deltaBytes, - mvccInputRows: context.mvccInputRows, - mvccInputBytes: context.mvccInputBytes, - mvccOutputRows: context.mvccOutputRows, - totalBuildBitmapMs: context.totalBuildBitmapMs, - totalBuildInputStreamMs: context.totalBuildInputStreamMs, - staleReadRegions: context.staleReadRegions, - minLocalStreamMs: context.minLocalStreamMs, - maxLocalStreamMs: context.maxLocalStreamMs, - minRemoteStreamMs: context.minRemoteStreamMs, - maxRemoteStreamMs: context.maxRemoteStreamMs, - regionsOfInstance: make(map[string]uint64), - - vectorIdxLoadFromS3: context.vectorIdxLoadFromS3, - vectorIdxLoadFromDisk: context.vectorIdxLoadFromDisk, - vectorIdxLoadFromCache: context.vectorIdxLoadFromCache, - vectorIdxLoadTimeMs: context.vectorIdxLoadTimeMs, - vectorIdxSearchTimeMs: context.vectorIdxSearchTimeMs, - vectorIdxSearchVisitedNodes: context.vectorIdxSearchVisitedNodes, - vectorIdxSearchDiscardedNodes: context.vectorIdxSearchDiscardedNodes, - vectorIdxReadVecTimeMs: context.vectorIdxReadVecTimeMs, - vectorIdxReadOthersTimeMs: context.vectorIdxReadOthersTimeMs, - - ftsNFromInmemoryNoindex: context.ftsNFromInmemoryNoindex, - ftsNFromTinyIndex: context.ftsNFromTinyIndex, - ftsNFromTinyNoindex: context.ftsNFromTinyNoindex, - ftsNFromDmfIndex: context.ftsNFromDmfIndex, - ftsNFromDmfNoindex: context.ftsNFromDmfNoindex, - ftsRowsFromInmemoryNoindex: context.ftsRowsFromInmemoryNoindex, - ftsRowsFromTinyIndex: context.ftsRowsFromTinyIndex, - ftsRowsFromTinyNoindex: context.ftsRowsFromTinyNoindex, - ftsRowsFromDmfIndex: context.ftsRowsFromDmfIndex, - ftsRowsFromDmfNoindex: context.ftsRowsFromDmfNoindex, - ftsIdxLoadTotalMs: context.ftsIdxLoadTotalMs, - ftsIdxLoadFromCache: context.ftsIdxLoadFromCache, - ftsIdxLoadFromColumnFile: context.ftsIdxLoadFromColumnFile, - ftsIdxLoadFromStableS3: context.ftsIdxLoadFromStableS3, - ftsIdxLoadFromStableDisk: context.ftsIdxLoadFromStableDisk, - ftsIdxSearchN: context.ftsIdxSearchN, - ftsIdxSearchTotalMs: context.ftsIdxSearchTotalMs, - ftsIdxDmSearchRows: context.ftsIdxDmSearchRows, - ftsIdxDmTotalReadFtsMs: context.ftsIdxDmTotalReadFtsMs, - ftsIdxDmTotalReadOthersMs: context.ftsIdxDmTotalReadOthersMs, - ftsIdxTinySearchRows: context.ftsIdxTinySearchRows, - ftsIdxTinyTotalReadFtsMs: context.ftsIdxTinyTotalReadFtsMs, - ftsIdxTinyTotalReadOthersMs: context.ftsIdxTinyTotalReadOthersMs, - ftsBruteTotalReadMs: context.ftsBruteTotalReadMs, - ftsBruteTotalSearchMs: context.ftsBruteTotalSearchMs, - - invertedIdxLoadFromS3: context.invertedIdxLoadFromS3, - invertedIdxLoadFromDisk: context.invertedIdxLoadFromDisk, - invertedIdxLoadFromCache: context.invertedIdxLoadFromCache, - invertedIdxLoadTimeMs: context.invertedIdxLoadTimeMs, - invertedIdxSearchTimeMs: context.invertedIdxSearchTimeMs, - invertedIdxSearchSkippedPacks: context.invertedIdxSearchSkippedPacks, - invertedIdxIndexedRows: context.invertedIdxIndexedRows, - invertedIdxSearchSelectedRows: context.invertedIdxSearchSelectedRows, - } - maps.Copy(newContext.regionsOfInstance, context.regionsOfInstance) - return newContext -} - -func (context *TiFlashScanContext) String() string { - var output []string - if context.vectorIdxLoadFromS3+context.vectorIdxLoadFromDisk+context.vectorIdxLoadFromCache > 0 { - var items []string - items = append(items, fmt.Sprintf("load:{total:%dms,from_s3:%d,from_disk:%d,from_cache:%d}", context.vectorIdxLoadTimeMs, context.vectorIdxLoadFromS3, context.vectorIdxLoadFromDisk, context.vectorIdxLoadFromCache)) - items = append(items, fmt.Sprintf("search:{total:%dms,visited_nodes:%d,discarded_nodes:%d}", context.vectorIdxSearchTimeMs, context.vectorIdxSearchVisitedNodes, context.vectorIdxSearchDiscardedNodes)) - items = append(items, fmt.Sprintf("read:{vec_total:%dms,others_total:%dms}", context.vectorIdxReadVecTimeMs, context.vectorIdxReadOthersTimeMs)) - output = append(output, "vector_idx:{"+strings.Join(items, ",")+"}") - } - if context.invertedIdxLoadFromS3+context.invertedIdxLoadFromDisk+context.invertedIdxLoadFromCache > 0 { - var items []string - items = append(items, fmt.Sprintf("load:{total:%dms,from_s3:%d,from_disk:%d,from_cache:%d}", context.invertedIdxLoadTimeMs, context.invertedIdxLoadFromS3, context.invertedIdxLoadFromDisk, context.invertedIdxLoadFromCache)) - items = append(items, fmt.Sprintf("search:{total:%dms,skipped_packs:%d,indexed_rows:%d,selected_rows:%d}", context.invertedIdxSearchTimeMs, context.invertedIdxSearchSkippedPacks, context.invertedIdxIndexedRows, context.invertedIdxSearchSelectedRows)) - output = append(output, "inverted_idx:{"+strings.Join(items, ",")+"}") - } - if context.ftsNFromInmemoryNoindex+context.ftsNFromTinyIndex+context.ftsNFromTinyNoindex+context.ftsNFromDmfIndex+context.ftsNFromDmfNoindex > 0 { - var items []string - items = append(items, fmt.Sprintf("hit_rows:{delta:%d,dmf:%d}", context.ftsRowsFromTinyIndex, context.ftsRowsFromDmfIndex)) - items = append(items, fmt.Sprintf("miss_rows:{mem:%d,delta:%d,dmf:%d}", context.ftsRowsFromInmemoryNoindex, context.ftsRowsFromTinyNoindex, context.ftsRowsFromDmfNoindex)) - items = append(items, fmt.Sprintf("idx_load:{total:%dms,from:{s3:%d,disk:%d,cache:%d}}", context.ftsIdxLoadTotalMs, context.ftsIdxLoadFromStableS3, context.ftsIdxLoadFromStableDisk+context.ftsIdxLoadFromColumnFile, context.ftsIdxLoadFromCache)) - avg := uint64(0) - if context.ftsIdxSearchN > 0 { - avg = context.ftsIdxSearchTotalMs / uint64(context.ftsIdxSearchN) - } - items = append(items, fmt.Sprintf("idx_search:{total:%dms,avg:%dms}", context.ftsIdxSearchTotalMs, avg)) - items = append(items, fmt.Sprintf("idx_read:{rows:%d,fts_total:%dms,others_total:%dms}", context.ftsIdxDmSearchRows+context.ftsIdxTinySearchRows, context.ftsIdxDmTotalReadFtsMs+context.ftsIdxTinyTotalReadFtsMs, context.ftsIdxDmTotalReadOthersMs+context.ftsIdxTinyTotalReadOthersMs)) - items = append(items, fmt.Sprintf("miss:{read:%dms,search:%dms}", context.ftsBruteTotalReadMs, context.ftsBruteTotalSearchMs)) - output = append(output, "fts:{"+strings.Join(items, ",")+"}") - } - - regionBalanceInfo := "none" - if len(context.regionsOfInstance) > 0 { - maxNum := uint64(0) - minNum := uint64(math.MaxUint64) - for _, v := range context.regionsOfInstance { - if v > maxNum { - maxNum = v - } - if v > 0 && v < minNum { - minNum = v - } - } - regionBalanceInfo = fmt.Sprintf("{instance_num: %d, max/min: %d/%d=%f}", - len(context.regionsOfInstance), - maxNum, - minNum, - float64(maxNum)/float64(minNum)) - } - dmfileDisaggInfo := "" - if context.disaggReadCacheHitBytes != 0 || context.disaggReadCacheMissBytes != 0 { - dmfileDisaggInfo = fmt.Sprintf(", disagg_cache_hit_bytes: %d, disagg_cache_miss_bytes: %d", - context.disaggReadCacheHitBytes, - context.disaggReadCacheMissBytes) - } - remoteStreamInfo := "" - if context.minRemoteStreamMs != 0 || context.maxRemoteStreamMs != 0 { - remoteStreamInfo = fmt.Sprintf("min_remote_stream:%dms, max_remote_stream:%dms, ", context.minRemoteStreamMs, context.maxRemoteStreamMs) - } - - // note: "tot" is short for "total" - output = append(output, fmt.Sprintf("tiflash_scan:{"+ - "mvcc_input_rows:%d, "+ - "mvcc_input_bytes:%d, "+ - "mvcc_output_rows:%d, "+ - "local_regions:%d, "+ - "remote_regions:%d, "+ - "tot_learner_read:%dms, "+ - "region_balance:%s, "+ - "delta_rows:%d, "+ - "delta_bytes:%d, "+ - "segments:%d, "+ - "stale_read_regions:%d, "+ - "tot_build_snapshot:%dms, "+ - "tot_build_bitmap:%dms, "+ - "tot_build_inputstream:%dms, "+ - "min_local_stream:%dms, "+ - "max_local_stream:%dms, "+ - "%s"+ // remote stream info - "dtfile:{"+ - "data_scanned_rows:%d, "+ - "data_skipped_rows:%d, "+ - "mvcc_scanned_rows:%d, "+ - "mvcc_skipped_rows:%d, "+ - "lm_filter_scanned_rows:%d, "+ - "lm_filter_skipped_rows:%d, "+ - "tot_rs_index_check:%dms, "+ - "tot_read:%dms"+ - "%s}"+ // Disagg cache info of DMFile - "}", - context.mvccInputRows, - context.mvccInputBytes, - context.mvccOutputRows, - context.localRegions, - context.remoteRegions, - context.totalLearnerReadMs, - regionBalanceInfo, - context.deltaRows, - context.deltaBytes, - context.segments, - context.staleReadRegions, - context.totalBuildSnapshotMs, - context.totalBuildBitmapMs, - context.totalBuildInputStreamMs, - context.minLocalStreamMs, - context.maxLocalStreamMs, - remoteStreamInfo, - context.dmfileDataScannedRows, - context.dmfileDataSkippedRows, - context.dmfileMvccScannedRows, - context.dmfileMvccSkippedRows, - context.dmfileLmFilterScannedRows, - context.dmfileLmFilterSkippedRows, - context.totalDmfileRsCheckMs, - context.totalDmfileReadMs, - dmfileDisaggInfo, - )) - - return strings.Join(output, ", ") -} - -// Merge make sum to merge the information in TiFlashScanContext -func (context *TiFlashScanContext) Merge(other TiFlashScanContext) { - context.dmfileDataScannedRows += other.dmfileDataScannedRows - context.dmfileDataSkippedRows += other.dmfileDataSkippedRows - context.dmfileMvccScannedRows += other.dmfileMvccScannedRows - context.dmfileMvccSkippedRows += other.dmfileMvccSkippedRows - context.dmfileLmFilterScannedRows += other.dmfileLmFilterScannedRows - context.dmfileLmFilterSkippedRows += other.dmfileLmFilterSkippedRows - context.totalDmfileRsCheckMs += other.totalDmfileRsCheckMs - context.totalDmfileReadMs += other.totalDmfileReadMs - context.totalBuildSnapshotMs += other.totalBuildSnapshotMs - context.localRegions += other.localRegions - context.remoteRegions += other.remoteRegions - context.totalLearnerReadMs += other.totalLearnerReadMs - context.disaggReadCacheHitBytes += other.disaggReadCacheHitBytes - context.disaggReadCacheMissBytes += other.disaggReadCacheMissBytes - context.segments += other.segments - context.readTasks += other.readTasks - context.deltaRows += other.deltaRows - context.deltaBytes += other.deltaBytes - context.mvccInputRows += other.mvccInputRows - context.mvccInputBytes += other.mvccInputBytes - context.mvccOutputRows += other.mvccOutputRows - context.totalBuildBitmapMs += other.totalBuildBitmapMs - context.totalBuildInputStreamMs += other.totalBuildInputStreamMs - context.staleReadRegions += other.staleReadRegions - - context.vectorIdxLoadFromS3 += other.vectorIdxLoadFromS3 - context.vectorIdxLoadFromDisk += other.vectorIdxLoadFromDisk - context.vectorIdxLoadFromCache += other.vectorIdxLoadFromCache - context.vectorIdxLoadTimeMs += other.vectorIdxLoadTimeMs - context.vectorIdxSearchTimeMs += other.vectorIdxSearchTimeMs - context.vectorIdxSearchVisitedNodes += other.vectorIdxSearchVisitedNodes - context.vectorIdxSearchDiscardedNodes += other.vectorIdxSearchDiscardedNodes - context.vectorIdxReadVecTimeMs += other.vectorIdxReadVecTimeMs - context.vectorIdxReadOthersTimeMs += other.vectorIdxReadOthersTimeMs - - context.ftsNFromInmemoryNoindex += other.ftsNFromInmemoryNoindex - context.ftsNFromTinyIndex += other.ftsNFromTinyIndex - context.ftsNFromTinyNoindex += other.ftsNFromTinyNoindex - context.ftsNFromDmfIndex += other.ftsNFromDmfIndex - context.ftsNFromDmfNoindex += other.ftsNFromDmfNoindex - context.ftsRowsFromInmemoryNoindex += other.ftsRowsFromInmemoryNoindex - context.ftsRowsFromTinyIndex += other.ftsRowsFromTinyIndex - context.ftsRowsFromTinyNoindex += other.ftsRowsFromTinyNoindex - context.ftsRowsFromDmfIndex += other.ftsRowsFromDmfIndex - context.ftsRowsFromDmfNoindex += other.ftsRowsFromDmfNoindex - context.ftsIdxLoadTotalMs += other.ftsIdxLoadTotalMs - context.ftsIdxLoadFromCache += other.ftsIdxLoadFromCache - context.ftsIdxLoadFromColumnFile += other.ftsIdxLoadFromColumnFile - context.ftsIdxLoadFromStableS3 += other.ftsIdxLoadFromStableS3 - context.ftsIdxLoadFromStableDisk += other.ftsIdxLoadFromStableDisk - context.ftsIdxSearchN += other.ftsIdxSearchN - context.ftsIdxSearchTotalMs += other.ftsIdxSearchTotalMs - context.ftsIdxDmSearchRows += other.ftsIdxDmSearchRows - context.ftsIdxDmTotalReadFtsMs += other.ftsIdxDmTotalReadFtsMs - context.ftsIdxDmTotalReadOthersMs += other.ftsIdxDmTotalReadOthersMs - context.ftsIdxTinySearchRows += other.ftsIdxTinySearchRows - context.ftsIdxTinyTotalReadFtsMs += other.ftsIdxTinyTotalReadFtsMs - context.ftsIdxTinyTotalReadOthersMs += other.ftsIdxTinyTotalReadOthersMs - context.ftsBruteTotalReadMs += other.ftsBruteTotalReadMs - context.ftsBruteTotalSearchMs += other.ftsBruteTotalSearchMs - - context.invertedIdxLoadFromS3 += other.invertedIdxLoadFromS3 - context.invertedIdxLoadFromDisk += other.invertedIdxLoadFromDisk - context.invertedIdxLoadFromCache += other.invertedIdxLoadFromCache - context.invertedIdxLoadTimeMs += other.invertedIdxLoadTimeMs - context.invertedIdxSearchTimeMs += other.invertedIdxSearchTimeMs - context.invertedIdxSearchSkippedPacks += other.invertedIdxSearchSkippedPacks - context.invertedIdxIndexedRows += other.invertedIdxIndexedRows - context.invertedIdxSearchSelectedRows += other.invertedIdxSearchSelectedRows - - if context.minLocalStreamMs == 0 || other.minLocalStreamMs < context.minLocalStreamMs { - context.minLocalStreamMs = other.minLocalStreamMs - } - if other.maxLocalStreamMs > context.maxLocalStreamMs { - context.maxLocalStreamMs = other.maxLocalStreamMs - } - if context.minRemoteStreamMs == 0 || other.minRemoteStreamMs < context.minRemoteStreamMs { - context.minRemoteStreamMs = other.minRemoteStreamMs - } - if other.maxRemoteStreamMs > context.maxRemoteStreamMs { - context.maxRemoteStreamMs = other.maxRemoteStreamMs - } - - if context.regionsOfInstance == nil { - context.regionsOfInstance = make(map[string]uint64) - } - for k, v := range other.regionsOfInstance { - context.regionsOfInstance[k] += v - } -} - -func (context *TiFlashScanContext) mergeExecSummary(summary *tipb.TiFlashScanContext) { - if summary == nil { - return - } - context.dmfileDataScannedRows += summary.GetDmfileDataScannedRows() - context.dmfileDataSkippedRows += summary.GetDmfileDataSkippedRows() - context.dmfileMvccScannedRows += summary.GetDmfileMvccScannedRows() - context.dmfileMvccSkippedRows += summary.GetDmfileMvccSkippedRows() - context.dmfileLmFilterScannedRows += summary.GetDmfileLmFilterScannedRows() - context.dmfileLmFilterSkippedRows += summary.GetDmfileLmFilterSkippedRows() - context.totalDmfileRsCheckMs += summary.GetTotalDmfileRsCheckMs() - context.totalDmfileReadMs += summary.GetTotalDmfileReadMs() - context.totalBuildSnapshotMs += summary.GetTotalBuildSnapshotMs() - context.localRegions += summary.GetLocalRegions() - context.remoteRegions += summary.GetRemoteRegions() - context.totalLearnerReadMs += summary.GetTotalLearnerReadMs() - context.disaggReadCacheHitBytes += summary.GetDisaggReadCacheHitBytes() - context.disaggReadCacheMissBytes += summary.GetDisaggReadCacheMissBytes() - context.segments += summary.GetSegments() - context.readTasks += summary.GetReadTasks() - context.deltaRows += summary.GetDeltaRows() - context.deltaBytes += summary.GetDeltaBytes() - context.mvccInputRows += summary.GetMvccInputRows() - context.mvccInputBytes += summary.GetMvccInputBytes() - context.mvccOutputRows += summary.GetMvccOutputRows() - context.totalBuildBitmapMs += summary.GetTotalBuildBitmapMs() - context.totalBuildInputStreamMs += summary.GetTotalBuildInputstreamMs() - context.staleReadRegions += summary.GetStaleReadRegions() - - context.vectorIdxLoadFromS3 += summary.GetVectorIdxLoadFromS3() - context.vectorIdxLoadFromDisk += summary.GetVectorIdxLoadFromDisk() - context.vectorIdxLoadFromCache += summary.GetVectorIdxLoadFromCache() - context.vectorIdxLoadTimeMs += summary.GetVectorIdxLoadTimeMs() - context.vectorIdxSearchTimeMs += summary.GetVectorIdxSearchTimeMs() - context.vectorIdxSearchVisitedNodes += summary.GetVectorIdxSearchVisitedNodes() - context.vectorIdxSearchDiscardedNodes += summary.GetVectorIdxSearchDiscardedNodes() - context.vectorIdxReadVecTimeMs += summary.GetVectorIdxReadVecTimeMs() - context.vectorIdxReadOthersTimeMs += summary.GetVectorIdxReadOthersTimeMs() - - context.ftsNFromInmemoryNoindex += summary.GetFtsNFromInmemoryNoindex() - context.ftsNFromTinyIndex += summary.GetFtsNFromTinyIndex() - context.ftsNFromTinyNoindex += summary.GetFtsNFromTinyNoindex() - context.ftsNFromDmfIndex += summary.GetFtsNFromDmfIndex() - context.ftsNFromDmfNoindex += summary.GetFtsNFromDmfNoindex() - context.ftsRowsFromInmemoryNoindex += summary.GetFtsRowsFromInmemoryNoindex() - context.ftsRowsFromTinyIndex += summary.GetFtsRowsFromTinyIndex() - context.ftsRowsFromTinyNoindex += summary.GetFtsRowsFromTinyNoindex() - context.ftsRowsFromDmfIndex += summary.GetFtsRowsFromDmfIndex() - context.ftsRowsFromDmfNoindex += summary.GetFtsRowsFromDmfNoindex() - context.ftsIdxLoadTotalMs += summary.GetFtsIdxLoadTotalMs() - context.ftsIdxLoadFromCache += summary.GetFtsIdxLoadFromCache() - context.ftsIdxLoadFromColumnFile += summary.GetFtsIdxLoadFromColumnFile() - context.ftsIdxLoadFromStableS3 += summary.GetFtsIdxLoadFromStableS3() - context.ftsIdxLoadFromStableDisk += summary.GetFtsIdxLoadFromStableDisk() - context.ftsIdxSearchN += summary.GetFtsIdxSearchN() - context.ftsIdxSearchTotalMs += summary.GetFtsIdxSearchTotalMs() - context.ftsIdxDmSearchRows += summary.GetFtsIdxDmSearchRows() - context.ftsIdxDmTotalReadFtsMs += summary.GetFtsIdxDmTotalReadFtsMs() - context.ftsIdxDmTotalReadOthersMs += summary.GetFtsIdxDmTotalReadOthersMs() - context.ftsIdxTinySearchRows += summary.GetFtsIdxTinySearchRows() - context.ftsIdxTinyTotalReadFtsMs += summary.GetFtsIdxTinyTotalReadFtsMs() - context.ftsIdxTinyTotalReadOthersMs += summary.GetFtsIdxTinyTotalReadOthersMs() - context.ftsBruteTotalReadMs += summary.GetFtsBruteTotalReadMs() - context.ftsBruteTotalSearchMs += summary.GetFtsBruteTotalSearchMs() - - context.invertedIdxLoadFromS3 += summary.GetInvertedIdxLoadFromS3() - context.invertedIdxLoadFromDisk += summary.GetInvertedIdxLoadFromDisk() - context.invertedIdxLoadFromCache += summary.GetInvertedIdxLoadFromCache() - context.invertedIdxLoadTimeMs += summary.GetInvertedIdxLoadTimeMs() - context.invertedIdxSearchTimeMs += summary.GetInvertedIdxSearchTimeMs() - context.invertedIdxSearchSkippedPacks += summary.GetInvertedIdxSearchSkippedPacks() - context.invertedIdxIndexedRows += summary.GetInvertedIdxIndexedRows() - context.invertedIdxSearchSelectedRows += summary.GetInvertedIdxSearchSelectedRows() - - if context.minLocalStreamMs == 0 || summary.GetMinLocalStreamMs() < context.minLocalStreamMs { - context.minLocalStreamMs = summary.GetMinLocalStreamMs() - } - if summary.GetMaxLocalStreamMs() > context.maxLocalStreamMs { - context.maxLocalStreamMs = summary.GetMaxLocalStreamMs() - } - if context.minRemoteStreamMs == 0 || summary.GetMinRemoteStreamMs() < context.minRemoteStreamMs { - context.minRemoteStreamMs = summary.GetMinRemoteStreamMs() - } - if summary.GetMaxRemoteStreamMs() > context.maxRemoteStreamMs { - context.maxRemoteStreamMs = summary.GetMaxRemoteStreamMs() - } - - if context.regionsOfInstance == nil { - context.regionsOfInstance = make(map[string]uint64, len(summary.GetRegionsOfInstance())) - } - for _, instance := range summary.GetRegionsOfInstance() { - context.regionsOfInstance[instance.GetInstanceId()] += instance.GetRegionNum() - } -} - -// Empty check whether TiFlashScanContext is Empty, if scan no pack and skip no pack, we regard it as empty -func (context *TiFlashScanContext) Empty() bool { - res := context.dmfileDataScannedRows == 0 && - context.dmfileDataSkippedRows == 0 && - context.dmfileMvccScannedRows == 0 && - context.dmfileMvccSkippedRows == 0 && - context.dmfileLmFilterScannedRows == 0 && - context.dmfileLmFilterSkippedRows == 0 && - context.localRegions == 0 && - context.remoteRegions == 0 && - context.vectorIdxLoadFromDisk == 0 && - context.vectorIdxLoadFromCache == 0 && - context.vectorIdxLoadFromS3 == 0 && - context.invertedIdxLoadFromDisk == 0 && - context.invertedIdxLoadFromCache == 0 && - context.invertedIdxLoadFromS3 == 0 && - context.ftsNFromInmemoryNoindex == 0 && - context.ftsNFromTinyIndex == 0 && - context.ftsNFromTinyNoindex == 0 && - context.ftsNFromDmfIndex == 0 && - context.ftsNFromDmfNoindex == 0 - return res -} - -// TiFlashWaitSummary is used to express all kinds of wait information in tiflash -type TiFlashWaitSummary struct { - // keep execution time to do merge work, always record the wait time with largest execution time - executionTime uint64 - minTSOWaitTime uint64 - pipelineBreakerWaitTime uint64 - pipelineQueueWaitTime uint64 -} - -// Clone implements the deep copy of * TiFlashWaitSummary -func (waitSummary *TiFlashWaitSummary) Clone() TiFlashWaitSummary { - newSummary := TiFlashWaitSummary{ - executionTime: waitSummary.executionTime, - minTSOWaitTime: waitSummary.minTSOWaitTime, - pipelineBreakerWaitTime: waitSummary.pipelineBreakerWaitTime, - pipelineQueueWaitTime: waitSummary.pipelineQueueWaitTime, - } - return newSummary -} - -// String dumps TiFlashWaitSummary info as string -func (waitSummary *TiFlashWaitSummary) String() string { - if waitSummary.CanBeIgnored() { - return "" - } - buf := bytes.NewBuffer(make([]byte, 0, 32)) - buf.WriteString("tiflash_wait: {") - empty := true - if waitSummary.minTSOWaitTime >= uint64(time.Millisecond) { - buf.WriteString("minTSO_wait: ") - buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.minTSOWaitTime).Milliseconds(), 10)) - buf.WriteString("ms") - empty = false - } - if waitSummary.pipelineBreakerWaitTime >= uint64(time.Millisecond) { - if !empty { - buf.WriteString(", ") - } - buf.WriteString("pipeline_breaker_wait: ") - buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.pipelineBreakerWaitTime).Milliseconds(), 10)) - buf.WriteString("ms") - empty = false - } - if waitSummary.pipelineQueueWaitTime >= uint64(time.Millisecond) { - if !empty { - buf.WriteString(", ") - } - buf.WriteString("pipeline_queue_wait: ") - buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.pipelineQueueWaitTime).Milliseconds(), 10)) - buf.WriteString("ms") - } - buf.WriteString("}") - return buf.String() -} - -// Merge make sum to merge the information in TiFlashWaitSummary -func (waitSummary *TiFlashWaitSummary) Merge(other TiFlashWaitSummary) { - if waitSummary.executionTime < other.executionTime { - waitSummary.executionTime = other.executionTime - waitSummary.minTSOWaitTime = other.minTSOWaitTime - waitSummary.pipelineBreakerWaitTime = other.pipelineBreakerWaitTime - waitSummary.pipelineQueueWaitTime = other.pipelineQueueWaitTime - } -} - -func (waitSummary *TiFlashWaitSummary) mergeExecSummary(summary *tipb.TiFlashWaitSummary, executionTime uint64) { - if summary == nil { - return - } - if waitSummary.executionTime < executionTime { - waitSummary.executionTime = executionTime - waitSummary.minTSOWaitTime = summary.GetMinTSOWaitNs() - waitSummary.pipelineBreakerWaitTime = summary.GetPipelineBreakerWaitNs() - waitSummary.pipelineQueueWaitTime = summary.GetPipelineQueueWaitNs() - } -} - -// CanBeIgnored check whether TiFlashWaitSummary can be ignored, not all tidb executors have significant tiflash wait summary -func (waitSummary *TiFlashWaitSummary) CanBeIgnored() bool { - res := waitSummary.minTSOWaitTime < uint64(time.Millisecond) && - waitSummary.pipelineBreakerWaitTime < uint64(time.Millisecond) && - waitSummary.pipelineQueueWaitTime < uint64(time.Millisecond) - return res -} - -// TiFlashNetworkTrafficSummary is used to express network traffic in tiflash -type TiFlashNetworkTrafficSummary struct { - innerZoneSendBytes uint64 - interZoneSendBytes uint64 - innerZoneReceiveBytes uint64 - interZoneReceiveBytes uint64 -} - -// UpdateTiKVExecDetails update tikvDetails with TiFlashNetworkTrafficSummary's values -func (networkTraffic *TiFlashNetworkTrafficSummary) UpdateTiKVExecDetails(tikvDetails *util.ExecDetails) { - if tikvDetails == nil { - return - } - tikvDetails.UnpackedBytesSentMPPCrossZone += int64(networkTraffic.interZoneSendBytes) - tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.interZoneSendBytes) - tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.innerZoneSendBytes) - - tikvDetails.UnpackedBytesReceivedMPPCrossZone += int64(networkTraffic.interZoneReceiveBytes) - tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.interZoneReceiveBytes) - tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.innerZoneReceiveBytes) -} - -// Clone implements the deep copy of * TiFlashNetworkTrafficSummary -func (networkTraffic *TiFlashNetworkTrafficSummary) Clone() TiFlashNetworkTrafficSummary { - newSummary := TiFlashNetworkTrafficSummary{ - innerZoneSendBytes: networkTraffic.innerZoneSendBytes, - interZoneSendBytes: networkTraffic.interZoneSendBytes, - innerZoneReceiveBytes: networkTraffic.innerZoneReceiveBytes, - interZoneReceiveBytes: networkTraffic.interZoneReceiveBytes, - } - return newSummary -} - -// Empty check whether TiFlashNetworkTrafficSummary is Empty, if no any network traffic, we regard it as empty -func (networkTraffic *TiFlashNetworkTrafficSummary) Empty() bool { - res := networkTraffic.innerZoneSendBytes == 0 && - networkTraffic.interZoneSendBytes == 0 && - networkTraffic.innerZoneReceiveBytes == 0 && - networkTraffic.interZoneReceiveBytes == 0 - return res -} - -// String dumps TiFlashNetworkTrafficSummary info as string -func (networkTraffic *TiFlashNetworkTrafficSummary) String() string { - buf := bytes.NewBuffer(make([]byte, 0, 32)) - buf.WriteString("tiflash_network: {") - empty := true - if networkTraffic.innerZoneSendBytes != 0 { - buf.WriteString("inner_zone_send_bytes: ") - buf.WriteString(strconv.FormatInt(int64(networkTraffic.innerZoneSendBytes), 10)) - empty = false - } - if networkTraffic.interZoneSendBytes != 0 { - if !empty { - buf.WriteString(", ") - } - buf.WriteString("inter_zone_send_bytes: ") - buf.WriteString(strconv.FormatInt(int64(networkTraffic.interZoneSendBytes), 10)) - empty = false - } - if networkTraffic.innerZoneReceiveBytes != 0 { - if !empty { - buf.WriteString(", ") - } - buf.WriteString("inner_zone_receive_bytes: ") - buf.WriteString(strconv.FormatInt(int64(networkTraffic.innerZoneReceiveBytes), 10)) - empty = false - } - if networkTraffic.interZoneReceiveBytes != 0 { - if !empty { - buf.WriteString(", ") - } - buf.WriteString("inter_zone_receive_bytes: ") - buf.WriteString(strconv.FormatInt(int64(networkTraffic.interZoneReceiveBytes), 10)) - } - buf.WriteString("}") - return buf.String() -} - -// Merge make sum to merge the information in TiFlashNetworkTrafficSummary -func (networkTraffic *TiFlashNetworkTrafficSummary) Merge(other TiFlashNetworkTrafficSummary) { - networkTraffic.innerZoneSendBytes += other.innerZoneSendBytes - networkTraffic.interZoneSendBytes += other.interZoneSendBytes - networkTraffic.innerZoneReceiveBytes += other.innerZoneReceiveBytes - networkTraffic.interZoneReceiveBytes += other.interZoneReceiveBytes -} - -func (networkTraffic *TiFlashNetworkTrafficSummary) mergeExecSummary(summary *tipb.TiFlashNetWorkSummary) { - if summary == nil { - return - } - networkTraffic.innerZoneSendBytes += *summary.InnerZoneSendBytes - networkTraffic.interZoneSendBytes += *summary.InterZoneSendBytes - networkTraffic.innerZoneReceiveBytes += *summary.InnerZoneReceiveBytes - networkTraffic.interZoneReceiveBytes += *summary.InterZoneReceiveBytes -} - -// GetInterZoneTrafficBytes returns the inter zone network traffic bytes involved -// between tiflash instances. -func (networkTraffic *TiFlashNetworkTrafficSummary) GetInterZoneTrafficBytes() uint64 { - // NOTE: we only count the inter zone sent bytes here because tiflash count the traffic bytes - // of all sub request. For each sub request, both side with count the send and recv traffic. - // So here, we only use the send bytes as the overall traffic to avoid count the traffic twice. - // While this statistics logic seems a bit weird to me, but this is the tiflash side desicion. - return networkTraffic.interZoneSendBytes -} - -// BasicRuntimeStats is the basic runtime stats. -type BasicRuntimeStats struct { - // the count of executors with the same id - executorCount atomic.Int32 - // executor's Next() called times. - loop atomic.Int32 - // executor consume time, including open, next, and close time. - consume atomic.Int64 - // executor open time. - open atomic.Int64 - // executor close time. - close atomic.Int64 - // executor return row count. - rows atomic.Int64 -} - -// GetActRows return total rows of BasicRuntimeStats. -func (e *BasicRuntimeStats) GetActRows() int64 { - return e.rows.Load() -} - -// Clone implements the RuntimeStats interface. -// BasicRuntimeStats shouldn't implement Clone interface because all executors with the same executor_id -// should share the same BasicRuntimeStats, duplicated BasicRuntimeStats are easy to cause mistakes. -func (*BasicRuntimeStats) Clone() RuntimeStats { - panic("BasicRuntimeStats should not implement Clone function") -} - -// Merge implements the RuntimeStats interface. -func (e *BasicRuntimeStats) Merge(rs RuntimeStats) { - tmp, ok := rs.(*BasicRuntimeStats) - if !ok { - return - } - e.loop.Add(tmp.loop.Load()) - e.consume.Add(tmp.consume.Load()) - e.open.Add(tmp.open.Load()) - e.close.Add(tmp.close.Load()) - e.rows.Add(tmp.rows.Load()) -} - -// Tp implements the RuntimeStats interface. -func (*BasicRuntimeStats) Tp() int { - return TpBasicRuntimeStats -} - -// RootRuntimeStats is the executor runtime stats that combine with multiple runtime stats. -type RootRuntimeStats struct { - basic *BasicRuntimeStats - groupRss []RuntimeStats -} - -// NewRootRuntimeStats returns a new RootRuntimeStats -func NewRootRuntimeStats() *RootRuntimeStats { - return &RootRuntimeStats{} -} - -// GetActRows return total rows of RootRuntimeStats. -func (e *RootRuntimeStats) GetActRows() int64 { - if e.basic == nil { - return 0 - } - return e.basic.rows.Load() -} - -// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly. -func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) { - return e.basic, e.groupRss -} - -// String implements the RuntimeStats interface. -func (e *RootRuntimeStats) String() string { - basic, groups := e.MergeStats() - strs := make([]string, 0, len(groups)+1) - if basic != nil { - strs = append(strs, basic.String()) - } - for _, group := range groups { - str := group.String() - if len(str) > 0 { - strs = append(strs, str) - } - } - return strings.Join(strs, ", ") -} - -// Record records executor's execution. -func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) { - e.loop.Add(1) - e.consume.Add(int64(d)) - e.rows.Add(int64(rowNum)) -} - -// RecordOpen records executor's open time. -func (e *BasicRuntimeStats) RecordOpen(d time.Duration) { - e.consume.Add(int64(d)) - e.open.Add(int64(d)) -} - -// RecordClose records executor's close time. -func (e *BasicRuntimeStats) RecordClose(d time.Duration) { - e.consume.Add(int64(d)) - e.close.Add(int64(d)) -} - -// SetRowNum sets the row num. -func (e *BasicRuntimeStats) SetRowNum(rowNum int64) { - e.rows.Store(rowNum) -} - -// String implements the RuntimeStats interface. -func (e *BasicRuntimeStats) String() string { - if e == nil { - return "" - } - var str strings.Builder - timePrefix := "" - if e.executorCount.Load() > 1 { - timePrefix = "total_" - } - totalTime := e.consume.Load() - openTime := e.open.Load() - closeTime := e.close.Load() - str.WriteString(timePrefix) - str.WriteString("time:") - str.WriteString(FormatDuration(time.Duration(totalTime))) - str.WriteString(", ") - str.WriteString(timePrefix) - str.WriteString("open:") - str.WriteString(FormatDuration(time.Duration(openTime))) - str.WriteString(", ") - str.WriteString(timePrefix) - str.WriteString("close:") - str.WriteString(FormatDuration(time.Duration(closeTime))) - str.WriteString(", loops:") - str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10)) - return str.String() -} - -// GetTime get the int64 total time -func (e *BasicRuntimeStats) GetTime() int64 { - return e.consume.Load() -} - -// RuntimeStatsColl collects executors's execution info. -type RuntimeStatsColl struct { - rootStats map[int]*RootRuntimeStats - copStats map[int]*CopRuntimeStats - stmtCopStats StmtCopRuntimeStats - mu sync.Mutex -} - -// NewRuntimeStatsColl creates new executor collector. -// Reuse the object to reduce allocation when *RuntimeStatsColl is not nil. -func NewRuntimeStatsColl(reuse *RuntimeStatsColl) *RuntimeStatsColl { - if reuse != nil { - // Reuse map is cheaper than create a new map object. - // Go compiler optimize this cleanup code pattern to a clearmap() function. - reuse.mu.Lock() - defer reuse.mu.Unlock() - for k := range reuse.rootStats { - delete(reuse.rootStats, k) - } - for k := range reuse.copStats { - delete(reuse.copStats, k) - } - return reuse - } - return &RuntimeStatsColl{ - rootStats: make(map[int]*RootRuntimeStats), - copStats: make(map[int]*CopRuntimeStats), - } -} - -// RegisterStats register execStat for a executor. -func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) { - e.mu.Lock() - defer e.mu.Unlock() - stats, ok := e.rootStats[planID] - if !ok { - stats = NewRootRuntimeStats() - e.rootStats[planID] = stats - } - tp := info.Tp() - found := false - for _, rss := range stats.groupRss { - if rss.Tp() == tp { - rss.Merge(info) - found = true - break - } - } - if !found { - stats.groupRss = append(stats.groupRss, info) - } -} - -// GetBasicRuntimeStats gets basicRuntimeStats for a executor -// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument: -// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount -// 2. Else, it returns nil -func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - stats, ok := e.rootStats[planID] - if !ok && initNewExecutorStats { - stats = NewRootRuntimeStats() - e.rootStats[planID] = stats - } - if stats == nil { - return nil - } - - if stats.basic == nil && initNewExecutorStats { - stats.basic = &BasicRuntimeStats{} - stats.basic.executorCount.Add(1) - } else if stats.basic != nil && initNewExecutorStats { - stats.basic.executorCount.Add(1) - } - return stats.basic -} - -// GetStmtCopRuntimeStats gets execStat for a executor. -func (e *RuntimeStatsColl) GetStmtCopRuntimeStats() StmtCopRuntimeStats { - return e.stmtCopStats -} - -// GetRootStats gets execStat for a executor. -func (e *RuntimeStatsColl) GetRootStats(planID int) *RootRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - runtimeStats, exists := e.rootStats[planID] - if !exists { - runtimeStats = NewRootRuntimeStats() - e.rootStats[planID] = runtimeStats - } - return runtimeStats -} - -// GetPlanActRows returns the actual rows of the plan. -func (e *RuntimeStatsColl) GetPlanActRows(planID int) int64 { - e.mu.Lock() - defer e.mu.Unlock() - runtimeStats, exists := e.rootStats[planID] - if !exists { - return 0 - } - return runtimeStats.GetActRows() -} - -// GetCopStats gets the CopRuntimeStats specified by planID. -func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats { - e.mu.Lock() - defer e.mu.Unlock() - copStats, ok := e.copStats[planID] - if !ok { - return nil - } - return copStats -} - -// GetCopCountAndRows returns the total cop-tasks count and total rows of all cop-tasks. -func (e *RuntimeStatsColl) GetCopCountAndRows(planID int) (int32, int64) { - e.mu.Lock() - defer e.mu.Unlock() - copStats, ok := e.copStats[planID] - if !ok { - return 0, 0 - } - return copStats.GetTasks(), copStats.GetActRows() -} - -func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, bool) { - if summary.GetExecutorId() != "" { - strs := strings.Split(summary.GetExecutorId(), "_") - if id, err := strconv.Atoi(strs[len(strs)-1]); err == nil { - return id, true - } - } - return 0, false -} - -// RecordCopStats records a specific cop tasks's execution detail. -func (e *RuntimeStatsColl) RecordCopStats(planID int, storeType kv.StoreType, scan *util.ScanDetail, time util.TimeDetail, summary *tipb.ExecutorExecutionSummary) int { - e.mu.Lock() - defer e.mu.Unlock() - copStats, ok := e.copStats[planID] - if !ok { - copStats = &CopRuntimeStats{ - timeDetail: time, - storeType: storeType, - } - if scan != nil { - copStats.scanDetail = *scan - } - e.copStats[planID] = copStats - } else { - if scan != nil { - copStats.scanDetail.Merge(scan) - } - copStats.timeDetail.Merge(&time) - } - if summary != nil { - // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in - // summary, use it overwrite the planID - id, valid := getPlanIDFromExecutionSummary(summary) - if valid && id != planID { - planID = id - copStats, ok = e.copStats[planID] - if !ok { - copStats = &CopRuntimeStats{ - storeType: storeType, - } - e.copStats[planID] = copStats - } - } - copStats.stats.mergeExecSummary(summary) - e.stmtCopStats.mergeExecSummary(summary) - } - return planID -} - -// RecordOneCopTask records a specific cop tasks's execution summary. -func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, summary *tipb.ExecutorExecutionSummary) int { - // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in - // summary, use it overwrite the planID - if id, valid := getPlanIDFromExecutionSummary(summary); valid { - planID = id - } - e.mu.Lock() - defer e.mu.Unlock() - copStats, ok := e.copStats[planID] - if !ok { - copStats = &CopRuntimeStats{ - storeType: storeType, - } - e.copStats[planID] = copStats - } - copStats.stats.mergeExecSummary(summary) - e.stmtCopStats.mergeExecSummary(summary) - return planID -} - -// ExistsRootStats checks if the planID exists in the rootStats collection. -func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool { - e.mu.Lock() - defer e.mu.Unlock() - _, exists := e.rootStats[planID] - return exists -} - -// ExistsCopStats checks if the planID exists in the copStats collection. -func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool { - e.mu.Lock() - defer e.mu.Unlock() - _, exists := e.copStats[planID] - return exists -} - -// ConcurrencyInfo is used to save the concurrency information of the executor operator -type ConcurrencyInfo struct { - concurrencyName string - concurrencyNum int -} - -// NewConcurrencyInfo creates new executor's concurrencyInfo. -func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { - return &ConcurrencyInfo{name, num} -} - -// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. -type RuntimeStatsWithConcurrencyInfo struct { - // executor concurrency information - concurrency []*ConcurrencyInfo - // protect concurrency - sync.Mutex -} - -// Tp implements the RuntimeStats interface. -func (*RuntimeStatsWithConcurrencyInfo) Tp() int { - return TpRuntimeStatsWithConcurrencyInfo -} - -// SetConcurrencyInfo sets the concurrency informations. -// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. -// When the num <= 0, it means the exector operator is not executed parallel. -func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { - e.Lock() - defer e.Unlock() - e.concurrency = e.concurrency[:0] - e.concurrency = append(e.concurrency, infos...) -} - -// Clone implements the RuntimeStats interface. -func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats { - newRs := &RuntimeStatsWithConcurrencyInfo{ - concurrency: make([]*ConcurrencyInfo, 0, len(e.concurrency)), - } - newRs.concurrency = append(newRs.concurrency, e.concurrency...) - return newRs -} - -// String implements the RuntimeStats interface. -func (e *RuntimeStatsWithConcurrencyInfo) String() string { - buf := bytes.NewBuffer(make([]byte, 0, 8)) - if len(e.concurrency) > 0 { - for i, concurrency := range e.concurrency { - if i > 0 { - buf.WriteString(", ") - } - if concurrency.concurrencyNum > 0 { - buf.WriteString(concurrency.concurrencyName) - buf.WriteByte(':') - buf.WriteString(strconv.Itoa(concurrency.concurrencyNum)) - } else { - buf.WriteString(concurrency.concurrencyName) - buf.WriteString(":OFF") - } - } - } - return buf.String() -} - -// Merge implements the RuntimeStats interface. -func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {} - -// RuntimeStatsWithCommit is the RuntimeStats with commit detail. -type RuntimeStatsWithCommit struct { - Commit *util.CommitDetails - LockKeys *util.LockKeysDetails - TxnCnt int -} - -// Tp implements the RuntimeStats interface. -func (*RuntimeStatsWithCommit) Tp() int { - return TpRuntimeStatsWithCommit -} - -// MergeCommitDetails merges the commit details. -func (e *RuntimeStatsWithCommit) MergeCommitDetails(detail *util.CommitDetails) { - if detail == nil { - return - } - if e.Commit == nil { - e.Commit = detail - e.TxnCnt = 1 - return - } - e.Commit.Merge(detail) - e.TxnCnt++ -} - -// Merge implements the RuntimeStats interface. -func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) { - tmp, ok := rs.(*RuntimeStatsWithCommit) - if !ok { - return - } - e.TxnCnt += tmp.TxnCnt - if tmp.Commit != nil { - if e.Commit == nil { - e.Commit = &util.CommitDetails{} - } - e.Commit.Merge(tmp.Commit) - } - - if tmp.LockKeys != nil { - if e.LockKeys == nil { - e.LockKeys = &util.LockKeysDetails{} - } - e.LockKeys.Merge(tmp.LockKeys) - } -} - -// Clone implements the RuntimeStats interface. -func (e *RuntimeStatsWithCommit) Clone() RuntimeStats { - newRs := RuntimeStatsWithCommit{ - TxnCnt: e.TxnCnt, - } - if e.Commit != nil { - newRs.Commit = e.Commit.Clone() - } - if e.LockKeys != nil { - newRs.LockKeys = e.LockKeys.Clone() - } - return &newRs -} - -// String implements the RuntimeStats interface. -func (e *RuntimeStatsWithCommit) String() string { - buf := bytes.NewBuffer(make([]byte, 0, 32)) - if e.Commit != nil { - buf.WriteString("commit_txn: {") - // Only print out when there are more than 1 transaction. - if e.TxnCnt > 1 { - buf.WriteString("count: ") - buf.WriteString(strconv.Itoa(e.TxnCnt)) - buf.WriteString(", ") - } - if e.Commit.PrewriteTime > 0 { - buf.WriteString("prewrite:") - buf.WriteString(FormatDuration(e.Commit.PrewriteTime)) - } - if e.Commit.WaitPrewriteBinlogTime > 0 { - buf.WriteString(", wait_prewrite_binlog:") - buf.WriteString(FormatDuration(e.Commit.WaitPrewriteBinlogTime)) - } - if e.Commit.GetCommitTsTime > 0 { - buf.WriteString(", get_commit_ts:") - buf.WriteString(FormatDuration(e.Commit.GetCommitTsTime)) - } - if e.Commit.CommitTime > 0 { - buf.WriteString(", commit:") - buf.WriteString(FormatDuration(e.Commit.CommitTime)) - } - e.Commit.Mu.Lock() - commitBackoffTime := e.Commit.Mu.CommitBackoffTime - if commitBackoffTime > 0 { - buf.WriteString(", backoff: {time: ") - buf.WriteString(FormatDuration(time.Duration(commitBackoffTime))) - if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 { - buf.WriteString(", prewrite type: ") - e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes) - } - if len(e.Commit.Mu.CommitBackoffTypes) > 0 { - buf.WriteString(", commit type: ") - e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes) - } - buf.WriteString("}") - } - if e.Commit.Mu.SlowestPrewrite.ReqTotalTime > 0 { - buf.WriteString(", slowest_prewrite_rpc: {total: ") - buf.WriteString(strconv.FormatFloat(e.Commit.Mu.SlowestPrewrite.ReqTotalTime.Seconds(), 'f', 3, 64)) - buf.WriteString("s, region_id: ") - buf.WriteString(strconv.FormatUint(e.Commit.Mu.SlowestPrewrite.Region, 10)) - buf.WriteString(", store: ") - buf.WriteString(e.Commit.Mu.SlowestPrewrite.StoreAddr) - buf.WriteString(", ") - buf.WriteString(e.Commit.Mu.SlowestPrewrite.ExecDetails.String()) - buf.WriteString("}") - } - if e.Commit.Mu.CommitPrimary.ReqTotalTime > 0 { - buf.WriteString(", commit_primary_rpc: {total: ") - buf.WriteString(strconv.FormatFloat(e.Commit.Mu.CommitPrimary.ReqTotalTime.Seconds(), 'f', 3, 64)) - buf.WriteString("s, region_id: ") - buf.WriteString(strconv.FormatUint(e.Commit.Mu.CommitPrimary.Region, 10)) - buf.WriteString(", store: ") - buf.WriteString(e.Commit.Mu.CommitPrimary.StoreAddr) - buf.WriteString(", ") - buf.WriteString(e.Commit.Mu.CommitPrimary.ExecDetails.String()) - buf.WriteString("}") - } - e.Commit.Mu.Unlock() - if e.Commit.ResolveLock.ResolveLockTime > 0 { - buf.WriteString(", resolve_lock: ") - buf.WriteString(FormatDuration(time.Duration(e.Commit.ResolveLock.ResolveLockTime))) - } - - prewriteRegionNum := atomic.LoadInt32(&e.Commit.PrewriteRegionNum) - if prewriteRegionNum > 0 { - buf.WriteString(", region_num:") - buf.WriteString(strconv.FormatInt(int64(prewriteRegionNum), 10)) - } - if e.Commit.WriteKeys > 0 { - buf.WriteString(", write_keys:") - buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteKeys), 10)) - } - if e.Commit.WriteSize > 0 { - buf.WriteString(", write_byte:") - buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteSize), 10)) - } - if e.Commit.TxnRetry > 0 { - buf.WriteString(", txn_retry:") - buf.WriteString(strconv.FormatInt(int64(e.Commit.TxnRetry), 10)) - } - buf.WriteString("}") - } - if e.LockKeys != nil { - if buf.Len() > 0 { - buf.WriteString(", ") - } - buf.WriteString("lock_keys: {") - if e.LockKeys.TotalTime > 0 { - buf.WriteString("time:") - buf.WriteString(FormatDuration(e.LockKeys.TotalTime)) - } - if e.LockKeys.RegionNum > 0 { - buf.WriteString(", region:") - buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RegionNum), 10)) - } - if e.LockKeys.LockKeys > 0 { - buf.WriteString(", keys:") - buf.WriteString(strconv.FormatInt(int64(e.LockKeys.LockKeys), 10)) - } - if e.LockKeys.ResolveLock.ResolveLockTime > 0 { - buf.WriteString(", resolve_lock:") - buf.WriteString(FormatDuration(time.Duration(e.LockKeys.ResolveLock.ResolveLockTime))) - } - e.LockKeys.Mu.Lock() - if e.LockKeys.BackoffTime > 0 { - buf.WriteString(", backoff: {time: ") - buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime))) - if len(e.LockKeys.Mu.BackoffTypes) > 0 { - buf.WriteString(", type: ") - e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes) - } - buf.WriteString("}") - } - if e.LockKeys.Mu.SlowestReqTotalTime > 0 { - buf.WriteString(", slowest_rpc: {total: ") - buf.WriteString(strconv.FormatFloat(e.LockKeys.Mu.SlowestReqTotalTime.Seconds(), 'f', 3, 64)) - buf.WriteString("s, region_id: ") - buf.WriteString(strconv.FormatUint(e.LockKeys.Mu.SlowestRegion, 10)) - buf.WriteString(", store: ") - buf.WriteString(e.LockKeys.Mu.SlowestStoreAddr) - buf.WriteString(", ") - buf.WriteString(e.LockKeys.Mu.SlowestExecDetails.String()) - buf.WriteString("}") - } - e.LockKeys.Mu.Unlock() - if e.LockKeys.LockRPCTime > 0 { - buf.WriteString(", lock_rpc:") - buf.WriteString(time.Duration(e.LockKeys.LockRPCTime).String()) - } - if e.LockKeys.LockRPCCount > 0 { - buf.WriteString(", rpc_count:") - buf.WriteString(strconv.FormatInt(e.LockKeys.LockRPCCount, 10)) - } - if e.LockKeys.RetryCount > 0 { - buf.WriteString(", retry_count:") - buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RetryCount), 10)) - } - - buf.WriteString("}") - } - return buf.String() -} - -func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) { - if len(backoffTypes) == 0 { - return - } - tpMap := make(map[string]struct{}) - tpArray := []string{} - for _, tpStr := range backoffTypes { - _, ok := tpMap[tpStr] - if ok { - continue - } - tpMap[tpStr] = struct{}{} - tpArray = append(tpArray, tpStr) - } - slices.Sort(tpArray) - buf.WriteByte('[') - for i, tp := range tpArray { - if i > 0 { - buf.WriteString(" ") - } - buf.WriteString(tp) - } - buf.WriteByte(']') -} - -// FormatDuration uses to format duration, this function will prune precision before format duration. -// Pruning precision is for human readability. The prune rule is: -// 1. if the duration was less than 1us, return the original string. -// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as: -// 9.412345ms -> 9.41ms -// 10.412345ms -> 10.4ms -// 5.999s -> 6s -// 100.45µs -> 100.5µs -func FormatDuration(d time.Duration) string { - if d <= time.Microsecond { - return d.String() - } - unit := getUnit(d) - if unit == time.Nanosecond { - return d.String() - } - integer := (d / unit) * unit //nolint:durationcheck - decimal := float64(d%unit) / float64(unit) - if d < 10*unit { - decimal = math.Round(decimal*100) / 100 - } else { - decimal = math.Round(decimal*10) / 10 - } - d = integer + time.Duration(decimal*float64(unit)) - return d.String() -} - -func getUnit(d time.Duration) time.Duration { - if d >= time.Second { - return time.Second - } else if d >= time.Millisecond { - return time.Millisecond - } else if d >= time.Microsecond { - return time.Microsecond - } - return time.Nanosecond -} - -// MergeTiFlashRUConsumption merge execution summaries from selectResponse into ruDetails. -func MergeTiFlashRUConsumption(executionSummaries []*tipb.ExecutorExecutionSummary, ruDetails *util.RUDetails) error { - newRUDetails := util.NewRUDetails() - for _, summary := range executionSummaries { - if summary != nil && summary.GetRuConsumption() != nil { - tiflashRU := new(resource_manager.Consumption) - if err := tiflashRU.Unmarshal(summary.GetRuConsumption()); err != nil { - return err - } - newRUDetails.Update(tiflashRU, 0) - } - } - ruDetails.Merge(newRUDetails) - return nil -} - -// RURuntimeStats is a wrapper of util.RUDetails, -// which implements the RuntimeStats interface. -type RURuntimeStats struct { - *util.RUDetails -} - -// String implements the RuntimeStats interface. -func (e *RURuntimeStats) String() string { - if e.RUDetails != nil { - buf := bytes.NewBuffer(make([]byte, 0, 8)) - buf.WriteString("RU:") - buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64)) - return buf.String() - } - return "" -} - -// Clone implements the RuntimeStats interface. -func (e *RURuntimeStats) Clone() RuntimeStats { - return &RURuntimeStats{RUDetails: e.RUDetails.Clone()} -} - -// Merge implements the RuntimeStats interface. -func (e *RURuntimeStats) Merge(other RuntimeStats) { - if tmp, ok := other.(*RURuntimeStats); ok { - if e.RUDetails != nil { - e.RUDetails.Merge(tmp.RUDetails) - } else { - e.RUDetails = tmp.RUDetails.Clone() - } - } -} - -// Tp implements the RuntimeStats interface. -func (*RURuntimeStats) Tp() int { - return TpRURuntimeStats -} diff --git a/pkg/util/execdetails/runtime_stats.go b/pkg/util/execdetails/runtime_stats.go new file mode 100644 index 0000000000..0b4027c7df --- /dev/null +++ b/pkg/util/execdetails/runtime_stats.go @@ -0,0 +1,999 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execdetails + +import ( + "bytes" + "slices" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/util" +) + +const ( + // TpBasicRuntimeStats is the tp for BasicRuntimeStats. + TpBasicRuntimeStats int = iota + // TpRuntimeStatsWithCommit is the tp for RuntimeStatsWithCommit. + TpRuntimeStatsWithCommit + // TpRuntimeStatsWithConcurrencyInfo is the tp for RuntimeStatsWithConcurrencyInfo. + TpRuntimeStatsWithConcurrencyInfo + // TpSnapshotRuntimeStats is the tp for SnapshotRuntimeStats. + TpSnapshotRuntimeStats + // TpHashJoinRuntimeStats is the tp for HashJoinRuntimeStats. + TpHashJoinRuntimeStats + // TpHashJoinRuntimeStatsV2 is the tp for hashJoinRuntimeStatsV2. + TpHashJoinRuntimeStatsV2 + // TpIndexLookUpJoinRuntimeStats is the tp for IndexLookUpJoinRuntimeStats. + TpIndexLookUpJoinRuntimeStats + // TpRuntimeStatsWithSnapshot is the tp for RuntimeStatsWithSnapshot. + TpRuntimeStatsWithSnapshot + // TpJoinRuntimeStats is the tp for JoinRuntimeStats. + TpJoinRuntimeStats + // TpSelectResultRuntimeStats is the tp for SelectResultRuntimeStats. + TpSelectResultRuntimeStats + // TpInsertRuntimeStat is the tp for InsertRuntimeStat + TpInsertRuntimeStat + // TpIndexLookUpRunTimeStats is the tp for IndexLookUpRunTimeStats + TpIndexLookUpRunTimeStats + // TpSlowQueryRuntimeStat is the tp for SlowQueryRuntimeStat + TpSlowQueryRuntimeStat + // TpHashAggRuntimeStat is the tp for HashAggRuntimeStat + TpHashAggRuntimeStat + // TpIndexMergeRunTimeStats is the tp for IndexMergeRunTimeStats + TpIndexMergeRunTimeStats + // TpBasicCopRunTimeStats is the tp for BasicCopRunTimeStats + TpBasicCopRunTimeStats + // TpUpdateRuntimeStats is the tp for UpdateRuntimeStats + TpUpdateRuntimeStats + // TpFKCheckRuntimeStats is the tp for FKCheckRuntimeStats + TpFKCheckRuntimeStats + // TpFKCascadeRuntimeStats is the tp for FKCascadeRuntimeStats + TpFKCascadeRuntimeStats + // TpRURuntimeStats is the tp for RURuntimeStats + TpRURuntimeStats +) + +// RuntimeStats is used to express the executor runtime information. +type RuntimeStats interface { + String() string + Merge(RuntimeStats) + Clone() RuntimeStats + Tp() int +} + +type basicCopRuntimeStats struct { + loop int32 + rows int64 + threads int32 + procTimes Percentile[Duration] + // executor extra infos + tiflashStats *TiflashStats +} + +// String implements the RuntimeStats interface. +func (e *basicCopRuntimeStats) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 16)) + buf.WriteString("time:") + buf.WriteString(FormatDuration(time.Duration(e.procTimes.sumVal))) + buf.WriteString(", loops:") + buf.WriteString(strconv.Itoa(int(e.loop))) + if e.tiflashStats != nil { + buf.WriteString(", threads:") + buf.WriteString(strconv.Itoa(int(e.threads))) + if !e.tiflashStats.waitSummary.CanBeIgnored() { + buf.WriteString(", ") + buf.WriteString(e.tiflashStats.waitSummary.String()) + } + if !e.tiflashStats.networkSummary.Empty() { + buf.WriteString(", ") + buf.WriteString(e.tiflashStats.networkSummary.String()) + } + buf.WriteString(", ") + buf.WriteString(e.tiflashStats.scanContext.String()) + } + return buf.String() +} + +// Clone implements the RuntimeStats interface. +func (e *basicCopRuntimeStats) Clone() RuntimeStats { + stats := &basicCopRuntimeStats{ + loop: e.loop, + rows: e.rows, + threads: e.threads, + procTimes: e.procTimes, + } + if e.tiflashStats != nil { + stats.tiflashStats = &TiflashStats{ + scanContext: e.tiflashStats.scanContext.Clone(), + waitSummary: e.tiflashStats.waitSummary.Clone(), + networkSummary: e.tiflashStats.networkSummary.Clone(), + } + } + return stats +} + +// Merge implements the RuntimeStats interface. +func (e *basicCopRuntimeStats) Merge(rs RuntimeStats) { + tmp, ok := rs.(*basicCopRuntimeStats) + if !ok { + return + } + e.loop += tmp.loop + e.rows += tmp.rows + e.threads += tmp.threads + if tmp.procTimes.Size() > 0 { + e.procTimes.MergePercentile(&tmp.procTimes) + } + if tmp.tiflashStats != nil { + if e.tiflashStats == nil { + e.tiflashStats = &TiflashStats{} + } + e.tiflashStats.scanContext.Merge(tmp.tiflashStats.scanContext) + e.tiflashStats.waitSummary.Merge(tmp.tiflashStats.waitSummary) + e.tiflashStats.networkSummary.Merge(tmp.tiflashStats.networkSummary) + } +} + +// mergeExecSummary likes Merge, but it merges ExecutorExecutionSummary directly. +func (e *basicCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) { + e.loop += (int32(*summary.NumIterations)) + e.rows += (int64(*summary.NumProducedRows)) + e.threads += int32(summary.GetConcurrency()) + e.procTimes.Add(Duration(int64(*summary.TimeProcessedNs))) + if tiflashScanContext := summary.GetTiflashScanContext(); tiflashScanContext != nil { + if e.tiflashStats == nil { + e.tiflashStats = &TiflashStats{} + } + e.tiflashStats.scanContext.mergeExecSummary(tiflashScanContext) + } + if tiflashWaitSummary := summary.GetTiflashWaitSummary(); tiflashWaitSummary != nil { + if e.tiflashStats == nil { + e.tiflashStats = &TiflashStats{} + } + e.tiflashStats.waitSummary.mergeExecSummary(tiflashWaitSummary, *summary.TimeProcessedNs) + } + if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil { + if e.tiflashStats == nil { + e.tiflashStats = &TiflashStats{} + } + e.tiflashStats.networkSummary.mergeExecSummary(tiflashNetworkSummary) + } +} + +// Tp implements the RuntimeStats interface. +func (*basicCopRuntimeStats) Tp() int { + return TpBasicCopRunTimeStats +} + +// StmtCopRuntimeStats stores the cop runtime stats of the total statement +type StmtCopRuntimeStats struct { + // TiflashNetworkStats stats all mpp tasks' network traffic info, nil if no any mpp tasks' network traffic + TiflashNetworkStats *TiFlashNetworkTrafficSummary +} + +// mergeExecSummary merges ExecutorExecutionSummary into stmt cop runtime stats directly. +func (e *StmtCopRuntimeStats) mergeExecSummary(summary *tipb.ExecutorExecutionSummary) { + if tiflashNetworkSummary := summary.GetTiflashNetworkSummary(); tiflashNetworkSummary != nil { + if e.TiflashNetworkStats == nil { + e.TiflashNetworkStats = &TiFlashNetworkTrafficSummary{} + } + e.TiflashNetworkStats.mergeExecSummary(tiflashNetworkSummary) + } +} + +// CopRuntimeStats collects cop tasks' execution info. +type CopRuntimeStats struct { + // stats stores the runtime statistics of coprocessor tasks. + // The key of the map is the tikv-server address. Because a tikv-server can + // have many region leaders, several coprocessor tasks can be sent to the + // same tikv-server instance. We have to use a list to maintain all tasks + // executed on each instance. + stats basicCopRuntimeStats + scanDetail util.ScanDetail + timeDetail util.TimeDetail + storeType kv.StoreType +} + +// GetActRows return total rows of CopRuntimeStats. +func (crs *CopRuntimeStats) GetActRows() int64 { + return crs.stats.rows +} + +// GetTasks return total tasks of CopRuntimeStats +func (crs *CopRuntimeStats) GetTasks() int32 { + return int32(crs.stats.procTimes.size) +} + +var zeroTimeDetail = util.TimeDetail{} + +func (crs *CopRuntimeStats) String() string { + procTimes := crs.stats.procTimes + totalTasks := procTimes.size + isTiFlashCop := crs.storeType == kv.TiFlash + buf := bytes.NewBuffer(make([]byte, 0, 16)) + { + printTiFlashSpecificInfo := func() { + if isTiFlashCop { + buf.WriteString(", ") + buf.WriteString("threads:") + buf.WriteString(strconv.Itoa(int(crs.stats.threads))) + buf.WriteString("}") + if crs.stats.tiflashStats != nil { + if !crs.stats.tiflashStats.waitSummary.CanBeIgnored() { + buf.WriteString(", ") + buf.WriteString(crs.stats.tiflashStats.waitSummary.String()) + } + if !crs.stats.tiflashStats.networkSummary.Empty() { + buf.WriteString(", ") + buf.WriteString(crs.stats.tiflashStats.networkSummary.String()) + } + if !crs.stats.tiflashStats.scanContext.Empty() { + buf.WriteString(", ") + buf.WriteString(crs.stats.tiflashStats.scanContext.String()) + } + } + } else { + buf.WriteString("}") + } + } + if totalTasks == 1 { + buf.WriteString(crs.storeType.Name()) + buf.WriteString("_task:{time:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0)))) + buf.WriteString(", loops:") + buf.WriteString(strconv.Itoa(int(crs.stats.loop))) + printTiFlashSpecificInfo() + } else if totalTasks > 0 { + buf.WriteString(crs.storeType.Name()) + buf.WriteString("_task:{proc max:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetMax().GetFloat64()))) + buf.WriteString(", min:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetMin().GetFloat64()))) + buf.WriteString(", avg: ") + buf.WriteString(FormatDuration(time.Duration(int64(procTimes.Sum()) / int64(totalTasks)))) + buf.WriteString(", p80:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.8)))) + buf.WriteString(", p95:") + buf.WriteString(FormatDuration(time.Duration(procTimes.GetPercentile(0.95)))) + buf.WriteString(", iters:") + buf.WriteString(strconv.Itoa(int(crs.stats.loop))) + buf.WriteString(", tasks:") + buf.WriteString(strconv.Itoa(totalTasks)) + printTiFlashSpecificInfo() + } + } + if !isTiFlashCop { + detail := crs.scanDetail.String() + if detail != "" { + buf.WriteString(", ") + buf.WriteString(detail) + } + if crs.timeDetail != zeroTimeDetail { + timeDetailStr := crs.timeDetail.String() + if timeDetailStr != "" { + buf.WriteString(", ") + buf.WriteString(timeDetailStr) + } + } + } + return buf.String() +} + +// BasicRuntimeStats is the basic runtime stats. +type BasicRuntimeStats struct { + // the count of executors with the same id + executorCount atomic.Int32 + // executor's Next() called times. + loop atomic.Int32 + // executor consume time, including open, next, and close time. + consume atomic.Int64 + // executor open time. + open atomic.Int64 + // executor close time. + close atomic.Int64 + // executor return row count. + rows atomic.Int64 +} + +// GetActRows return total rows of BasicRuntimeStats. +func (e *BasicRuntimeStats) GetActRows() int64 { + return e.rows.Load() +} + +// Clone implements the RuntimeStats interface. +// BasicRuntimeStats shouldn't implement Clone interface because all executors with the same executor_id +// should share the same BasicRuntimeStats, duplicated BasicRuntimeStats are easy to cause mistakes. +func (*BasicRuntimeStats) Clone() RuntimeStats { + panic("BasicRuntimeStats should not implement Clone function") +} + +// Merge implements the RuntimeStats interface. +func (e *BasicRuntimeStats) Merge(rs RuntimeStats) { + tmp, ok := rs.(*BasicRuntimeStats) + if !ok { + return + } + e.loop.Add(tmp.loop.Load()) + e.consume.Add(tmp.consume.Load()) + e.open.Add(tmp.open.Load()) + e.close.Add(tmp.close.Load()) + e.rows.Add(tmp.rows.Load()) +} + +// Tp implements the RuntimeStats interface. +func (*BasicRuntimeStats) Tp() int { + return TpBasicRuntimeStats +} + +// RootRuntimeStats is the executor runtime stats that combine with multiple runtime stats. +type RootRuntimeStats struct { + basic *BasicRuntimeStats + groupRss []RuntimeStats +} + +// NewRootRuntimeStats returns a new RootRuntimeStats +func NewRootRuntimeStats() *RootRuntimeStats { + return &RootRuntimeStats{} +} + +// GetActRows return total rows of RootRuntimeStats. +func (e *RootRuntimeStats) GetActRows() int64 { + if e.basic == nil { + return 0 + } + return e.basic.rows.Load() +} + +// MergeStats merges stats in the RootRuntimeStats and return the stats suitable for display directly. +func (e *RootRuntimeStats) MergeStats() (basic *BasicRuntimeStats, groups []RuntimeStats) { + return e.basic, e.groupRss +} + +// String implements the RuntimeStats interface. +func (e *RootRuntimeStats) String() string { + basic, groups := e.MergeStats() + strs := make([]string, 0, len(groups)+1) + if basic != nil { + strs = append(strs, basic.String()) + } + for _, group := range groups { + str := group.String() + if len(str) > 0 { + strs = append(strs, str) + } + } + return strings.Join(strs, ", ") +} + +// Record records executor's execution. +func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) { + e.loop.Add(1) + e.consume.Add(int64(d)) + e.rows.Add(int64(rowNum)) +} + +// RecordOpen records executor's open time. +func (e *BasicRuntimeStats) RecordOpen(d time.Duration) { + e.consume.Add(int64(d)) + e.open.Add(int64(d)) +} + +// RecordClose records executor's close time. +func (e *BasicRuntimeStats) RecordClose(d time.Duration) { + e.consume.Add(int64(d)) + e.close.Add(int64(d)) +} + +// SetRowNum sets the row num. +func (e *BasicRuntimeStats) SetRowNum(rowNum int64) { + e.rows.Store(rowNum) +} + +// String implements the RuntimeStats interface. +func (e *BasicRuntimeStats) String() string { + if e == nil { + return "" + } + var str strings.Builder + timePrefix := "" + if e.executorCount.Load() > 1 { + timePrefix = "total_" + } + totalTime := e.consume.Load() + openTime := e.open.Load() + closeTime := e.close.Load() + str.WriteString(timePrefix) + str.WriteString("time:") + str.WriteString(FormatDuration(time.Duration(totalTime))) + str.WriteString(", ") + str.WriteString(timePrefix) + str.WriteString("open:") + str.WriteString(FormatDuration(time.Duration(openTime))) + str.WriteString(", ") + str.WriteString(timePrefix) + str.WriteString("close:") + str.WriteString(FormatDuration(time.Duration(closeTime))) + str.WriteString(", loops:") + str.WriteString(strconv.FormatInt(int64(e.loop.Load()), 10)) + return str.String() +} + +// GetTime get the int64 total time +func (e *BasicRuntimeStats) GetTime() int64 { + return e.consume.Load() +} + +// RuntimeStatsColl collects executors's execution info. +type RuntimeStatsColl struct { + rootStats map[int]*RootRuntimeStats + copStats map[int]*CopRuntimeStats + stmtCopStats StmtCopRuntimeStats + mu sync.Mutex +} + +// NewRuntimeStatsColl creates new executor collector. +// Reuse the object to reduce allocation when *RuntimeStatsColl is not nil. +func NewRuntimeStatsColl(reuse *RuntimeStatsColl) *RuntimeStatsColl { + if reuse != nil { + // Reuse map is cheaper than create a new map object. + // Go compiler optimize this cleanup code pattern to a clearmap() function. + reuse.mu.Lock() + defer reuse.mu.Unlock() + for k := range reuse.rootStats { + delete(reuse.rootStats, k) + } + for k := range reuse.copStats { + delete(reuse.copStats, k) + } + return reuse + } + return &RuntimeStatsColl{ + rootStats: make(map[int]*RootRuntimeStats), + copStats: make(map[int]*CopRuntimeStats), + } +} + +// RegisterStats register execStat for a executor. +func (e *RuntimeStatsColl) RegisterStats(planID int, info RuntimeStats) { + e.mu.Lock() + defer e.mu.Unlock() + stats, ok := e.rootStats[planID] + if !ok { + stats = NewRootRuntimeStats() + e.rootStats[planID] = stats + } + tp := info.Tp() + found := false + for _, rss := range stats.groupRss { + if rss.Tp() == tp { + rss.Merge(info) + found = true + break + } + } + if !found { + stats.groupRss = append(stats.groupRss, info) + } +} + +// GetBasicRuntimeStats gets basicRuntimeStats for a executor +// When rootStat/rootStat's basicRuntimeStats is nil, the behavior is decided by initNewExecutorStats argument: +// 1. If true, it created a new one, and increase basicRuntimeStats' executorCount +// 2. Else, it returns nil +func (e *RuntimeStatsColl) GetBasicRuntimeStats(planID int, initNewExecutorStats bool) *BasicRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + stats, ok := e.rootStats[planID] + if !ok && initNewExecutorStats { + stats = NewRootRuntimeStats() + e.rootStats[planID] = stats + } + if stats == nil { + return nil + } + + if stats.basic == nil && initNewExecutorStats { + stats.basic = &BasicRuntimeStats{} + stats.basic.executorCount.Add(1) + } else if stats.basic != nil && initNewExecutorStats { + stats.basic.executorCount.Add(1) + } + return stats.basic +} + +// GetStmtCopRuntimeStats gets execStat for a executor. +func (e *RuntimeStatsColl) GetStmtCopRuntimeStats() StmtCopRuntimeStats { + return e.stmtCopStats +} + +// GetRootStats gets execStat for a executor. +func (e *RuntimeStatsColl) GetRootStats(planID int) *RootRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + runtimeStats, exists := e.rootStats[planID] + if !exists { + runtimeStats = NewRootRuntimeStats() + e.rootStats[planID] = runtimeStats + } + return runtimeStats +} + +// GetPlanActRows returns the actual rows of the plan. +func (e *RuntimeStatsColl) GetPlanActRows(planID int) int64 { + e.mu.Lock() + defer e.mu.Unlock() + runtimeStats, exists := e.rootStats[planID] + if !exists { + return 0 + } + return runtimeStats.GetActRows() +} + +// GetCopStats gets the CopRuntimeStats specified by planID. +func (e *RuntimeStatsColl) GetCopStats(planID int) *CopRuntimeStats { + e.mu.Lock() + defer e.mu.Unlock() + copStats, ok := e.copStats[planID] + if !ok { + return nil + } + return copStats +} + +// GetCopCountAndRows returns the total cop-tasks count and total rows of all cop-tasks. +func (e *RuntimeStatsColl) GetCopCountAndRows(planID int) (int32, int64) { + e.mu.Lock() + defer e.mu.Unlock() + copStats, ok := e.copStats[planID] + if !ok { + return 0, 0 + } + return copStats.GetTasks(), copStats.GetActRows() +} + +func getPlanIDFromExecutionSummary(summary *tipb.ExecutorExecutionSummary) (int, bool) { + if summary.GetExecutorId() != "" { + strs := strings.Split(summary.GetExecutorId(), "_") + if id, err := strconv.Atoi(strs[len(strs)-1]); err == nil { + return id, true + } + } + return 0, false +} + +// RecordCopStats records a specific cop tasks's execution detail. +func (e *RuntimeStatsColl) RecordCopStats(planID int, storeType kv.StoreType, scan *util.ScanDetail, time util.TimeDetail, summary *tipb.ExecutorExecutionSummary) int { + e.mu.Lock() + defer e.mu.Unlock() + copStats, ok := e.copStats[planID] + if !ok { + copStats = &CopRuntimeStats{ + timeDetail: time, + storeType: storeType, + } + if scan != nil { + copStats.scanDetail = *scan + } + e.copStats[planID] = copStats + } else { + if scan != nil { + copStats.scanDetail.Merge(scan) + } + copStats.timeDetail.Merge(&time) + } + if summary != nil { + // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in + // summary, use it overwrite the planID + id, valid := getPlanIDFromExecutionSummary(summary) + if valid && id != planID { + planID = id + copStats, ok = e.copStats[planID] + if !ok { + copStats = &CopRuntimeStats{ + storeType: storeType, + } + e.copStats[planID] = copStats + } + } + copStats.stats.mergeExecSummary(summary) + e.stmtCopStats.mergeExecSummary(summary) + } + return planID +} + +// RecordOneCopTask records a specific cop tasks's execution summary. +func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType kv.StoreType, summary *tipb.ExecutorExecutionSummary) int { + // for TiFlash cop response, ExecutorExecutionSummary contains executor id, so if there is a valid executor id in + // summary, use it overwrite the planID + if id, valid := getPlanIDFromExecutionSummary(summary); valid { + planID = id + } + e.mu.Lock() + defer e.mu.Unlock() + copStats, ok := e.copStats[planID] + if !ok { + copStats = &CopRuntimeStats{ + storeType: storeType, + } + e.copStats[planID] = copStats + } + copStats.stats.mergeExecSummary(summary) + e.stmtCopStats.mergeExecSummary(summary) + return planID +} + +// ExistsRootStats checks if the planID exists in the rootStats collection. +func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool { + e.mu.Lock() + defer e.mu.Unlock() + _, exists := e.rootStats[planID] + return exists +} + +// ExistsCopStats checks if the planID exists in the copStats collection. +func (e *RuntimeStatsColl) ExistsCopStats(planID int) bool { + e.mu.Lock() + defer e.mu.Unlock() + _, exists := e.copStats[planID] + return exists +} + +// ConcurrencyInfo is used to save the concurrency information of the executor operator +type ConcurrencyInfo struct { + concurrencyName string + concurrencyNum int +} + +// NewConcurrencyInfo creates new executor's concurrencyInfo. +func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { + return &ConcurrencyInfo{name, num} +} + +// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. +type RuntimeStatsWithConcurrencyInfo struct { + // executor concurrency information + concurrency []*ConcurrencyInfo + // protect concurrency + sync.Mutex +} + +// Tp implements the RuntimeStats interface. +func (*RuntimeStatsWithConcurrencyInfo) Tp() int { + return TpRuntimeStatsWithConcurrencyInfo +} + +// SetConcurrencyInfo sets the concurrency informations. +// We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. +// When the num <= 0, it means the exector operator is not executed parallel. +func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { + e.Lock() + defer e.Unlock() + e.concurrency = e.concurrency[:0] + e.concurrency = append(e.concurrency, infos...) +} + +// Clone implements the RuntimeStats interface. +func (e *RuntimeStatsWithConcurrencyInfo) Clone() RuntimeStats { + newRs := &RuntimeStatsWithConcurrencyInfo{ + concurrency: make([]*ConcurrencyInfo, 0, len(e.concurrency)), + } + newRs.concurrency = append(newRs.concurrency, e.concurrency...) + return newRs +} + +// String implements the RuntimeStats interface. +func (e *RuntimeStatsWithConcurrencyInfo) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 8)) + if len(e.concurrency) > 0 { + for i, concurrency := range e.concurrency { + if i > 0 { + buf.WriteString(", ") + } + if concurrency.concurrencyNum > 0 { + buf.WriteString(concurrency.concurrencyName) + buf.WriteByte(':') + buf.WriteString(strconv.Itoa(concurrency.concurrencyNum)) + } else { + buf.WriteString(concurrency.concurrencyName) + buf.WriteString(":OFF") + } + } + } + return buf.String() +} + +// Merge implements the RuntimeStats interface. +func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {} + +// RuntimeStatsWithCommit is the RuntimeStats with commit detail. +type RuntimeStatsWithCommit struct { + Commit *util.CommitDetails + LockKeys *util.LockKeysDetails + TxnCnt int +} + +// Tp implements the RuntimeStats interface. +func (*RuntimeStatsWithCommit) Tp() int { + return TpRuntimeStatsWithCommit +} + +// MergeCommitDetails merges the commit details. +func (e *RuntimeStatsWithCommit) MergeCommitDetails(detail *util.CommitDetails) { + if detail == nil { + return + } + if e.Commit == nil { + e.Commit = detail + e.TxnCnt = 1 + return + } + e.Commit.Merge(detail) + e.TxnCnt++ +} + +// Merge implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) Merge(rs RuntimeStats) { + tmp, ok := rs.(*RuntimeStatsWithCommit) + if !ok { + return + } + e.TxnCnt += tmp.TxnCnt + if tmp.Commit != nil { + if e.Commit == nil { + e.Commit = &util.CommitDetails{} + } + e.Commit.Merge(tmp.Commit) + } + + if tmp.LockKeys != nil { + if e.LockKeys == nil { + e.LockKeys = &util.LockKeysDetails{} + } + e.LockKeys.Merge(tmp.LockKeys) + } +} + +// Clone implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) Clone() RuntimeStats { + newRs := RuntimeStatsWithCommit{ + TxnCnt: e.TxnCnt, + } + if e.Commit != nil { + newRs.Commit = e.Commit.Clone() + } + if e.LockKeys != nil { + newRs.LockKeys = e.LockKeys.Clone() + } + return &newRs +} + +// String implements the RuntimeStats interface. +func (e *RuntimeStatsWithCommit) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 32)) + if e.Commit != nil { + buf.WriteString("commit_txn: {") + // Only print out when there are more than 1 transaction. + if e.TxnCnt > 1 { + buf.WriteString("count: ") + buf.WriteString(strconv.Itoa(e.TxnCnt)) + buf.WriteString(", ") + } + if e.Commit.PrewriteTime > 0 { + buf.WriteString("prewrite:") + buf.WriteString(FormatDuration(e.Commit.PrewriteTime)) + } + if e.Commit.WaitPrewriteBinlogTime > 0 { + buf.WriteString(", wait_prewrite_binlog:") + buf.WriteString(FormatDuration(e.Commit.WaitPrewriteBinlogTime)) + } + if e.Commit.GetCommitTsTime > 0 { + buf.WriteString(", get_commit_ts:") + buf.WriteString(FormatDuration(e.Commit.GetCommitTsTime)) + } + if e.Commit.CommitTime > 0 { + buf.WriteString(", commit:") + buf.WriteString(FormatDuration(e.Commit.CommitTime)) + } + e.Commit.Mu.Lock() + commitBackoffTime := e.Commit.Mu.CommitBackoffTime + if commitBackoffTime > 0 { + buf.WriteString(", backoff: {time: ") + buf.WriteString(FormatDuration(time.Duration(commitBackoffTime))) + if len(e.Commit.Mu.PrewriteBackoffTypes) > 0 { + buf.WriteString(", prewrite type: ") + e.formatBackoff(buf, e.Commit.Mu.PrewriteBackoffTypes) + } + if len(e.Commit.Mu.CommitBackoffTypes) > 0 { + buf.WriteString(", commit type: ") + e.formatBackoff(buf, e.Commit.Mu.CommitBackoffTypes) + } + buf.WriteString("}") + } + if e.Commit.Mu.SlowestPrewrite.ReqTotalTime > 0 { + buf.WriteString(", slowest_prewrite_rpc: {total: ") + buf.WriteString(strconv.FormatFloat(e.Commit.Mu.SlowestPrewrite.ReqTotalTime.Seconds(), 'f', 3, 64)) + buf.WriteString("s, region_id: ") + buf.WriteString(strconv.FormatUint(e.Commit.Mu.SlowestPrewrite.Region, 10)) + buf.WriteString(", store: ") + buf.WriteString(e.Commit.Mu.SlowestPrewrite.StoreAddr) + buf.WriteString(", ") + buf.WriteString(e.Commit.Mu.SlowestPrewrite.ExecDetails.String()) + buf.WriteString("}") + } + if e.Commit.Mu.CommitPrimary.ReqTotalTime > 0 { + buf.WriteString(", commit_primary_rpc: {total: ") + buf.WriteString(strconv.FormatFloat(e.Commit.Mu.CommitPrimary.ReqTotalTime.Seconds(), 'f', 3, 64)) + buf.WriteString("s, region_id: ") + buf.WriteString(strconv.FormatUint(e.Commit.Mu.CommitPrimary.Region, 10)) + buf.WriteString(", store: ") + buf.WriteString(e.Commit.Mu.CommitPrimary.StoreAddr) + buf.WriteString(", ") + buf.WriteString(e.Commit.Mu.CommitPrimary.ExecDetails.String()) + buf.WriteString("}") + } + e.Commit.Mu.Unlock() + if e.Commit.ResolveLock.ResolveLockTime > 0 { + buf.WriteString(", resolve_lock: ") + buf.WriteString(FormatDuration(time.Duration(e.Commit.ResolveLock.ResolveLockTime))) + } + + prewriteRegionNum := atomic.LoadInt32(&e.Commit.PrewriteRegionNum) + if prewriteRegionNum > 0 { + buf.WriteString(", region_num:") + buf.WriteString(strconv.FormatInt(int64(prewriteRegionNum), 10)) + } + if e.Commit.WriteKeys > 0 { + buf.WriteString(", write_keys:") + buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteKeys), 10)) + } + if e.Commit.WriteSize > 0 { + buf.WriteString(", write_byte:") + buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteSize), 10)) + } + if e.Commit.TxnRetry > 0 { + buf.WriteString(", txn_retry:") + buf.WriteString(strconv.FormatInt(int64(e.Commit.TxnRetry), 10)) + } + buf.WriteString("}") + } + if e.LockKeys != nil { + if buf.Len() > 0 { + buf.WriteString(", ") + } + buf.WriteString("lock_keys: {") + if e.LockKeys.TotalTime > 0 { + buf.WriteString("time:") + buf.WriteString(FormatDuration(e.LockKeys.TotalTime)) + } + if e.LockKeys.RegionNum > 0 { + buf.WriteString(", region:") + buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RegionNum), 10)) + } + if e.LockKeys.LockKeys > 0 { + buf.WriteString(", keys:") + buf.WriteString(strconv.FormatInt(int64(e.LockKeys.LockKeys), 10)) + } + if e.LockKeys.ResolveLock.ResolveLockTime > 0 { + buf.WriteString(", resolve_lock:") + buf.WriteString(FormatDuration(time.Duration(e.LockKeys.ResolveLock.ResolveLockTime))) + } + e.LockKeys.Mu.Lock() + if e.LockKeys.BackoffTime > 0 { + buf.WriteString(", backoff: {time: ") + buf.WriteString(FormatDuration(time.Duration(e.LockKeys.BackoffTime))) + if len(e.LockKeys.Mu.BackoffTypes) > 0 { + buf.WriteString(", type: ") + e.formatBackoff(buf, e.LockKeys.Mu.BackoffTypes) + } + buf.WriteString("}") + } + if e.LockKeys.Mu.SlowestReqTotalTime > 0 { + buf.WriteString(", slowest_rpc: {total: ") + buf.WriteString(strconv.FormatFloat(e.LockKeys.Mu.SlowestReqTotalTime.Seconds(), 'f', 3, 64)) + buf.WriteString("s, region_id: ") + buf.WriteString(strconv.FormatUint(e.LockKeys.Mu.SlowestRegion, 10)) + buf.WriteString(", store: ") + buf.WriteString(e.LockKeys.Mu.SlowestStoreAddr) + buf.WriteString(", ") + buf.WriteString(e.LockKeys.Mu.SlowestExecDetails.String()) + buf.WriteString("}") + } + e.LockKeys.Mu.Unlock() + if e.LockKeys.LockRPCTime > 0 { + buf.WriteString(", lock_rpc:") + buf.WriteString(time.Duration(e.LockKeys.LockRPCTime).String()) + } + if e.LockKeys.LockRPCCount > 0 { + buf.WriteString(", rpc_count:") + buf.WriteString(strconv.FormatInt(e.LockKeys.LockRPCCount, 10)) + } + if e.LockKeys.RetryCount > 0 { + buf.WriteString(", retry_count:") + buf.WriteString(strconv.FormatInt(int64(e.LockKeys.RetryCount), 10)) + } + + buf.WriteString("}") + } + return buf.String() +} + +func (*RuntimeStatsWithCommit) formatBackoff(buf *bytes.Buffer, backoffTypes []string) { + if len(backoffTypes) == 0 { + return + } + tpMap := make(map[string]struct{}) + tpArray := []string{} + for _, tpStr := range backoffTypes { + _, ok := tpMap[tpStr] + if ok { + continue + } + tpMap[tpStr] = struct{}{} + tpArray = append(tpArray, tpStr) + } + slices.Sort(tpArray) + buf.WriteByte('[') + for i, tp := range tpArray { + if i > 0 { + buf.WriteString(" ") + } + buf.WriteString(tp) + } + buf.WriteByte(']') +} + +// RURuntimeStats is a wrapper of util.RUDetails, +// which implements the RuntimeStats interface. +type RURuntimeStats struct { + *util.RUDetails +} + +// String implements the RuntimeStats interface. +func (e *RURuntimeStats) String() string { + if e.RUDetails != nil { + buf := bytes.NewBuffer(make([]byte, 0, 8)) + buf.WriteString("RU:") + buf.WriteString(strconv.FormatFloat(e.RRU()+e.WRU(), 'f', 2, 64)) + return buf.String() + } + return "" +} + +// Clone implements the RuntimeStats interface. +func (e *RURuntimeStats) Clone() RuntimeStats { + return &RURuntimeStats{RUDetails: e.RUDetails.Clone()} +} + +// Merge implements the RuntimeStats interface. +func (e *RURuntimeStats) Merge(other RuntimeStats) { + if tmp, ok := other.(*RURuntimeStats); ok { + if e.RUDetails != nil { + e.RUDetails.Merge(tmp.RUDetails) + } else { + e.RUDetails = tmp.RUDetails.Clone() + } + } +} + +// Tp implements the RuntimeStats interface. +func (*RURuntimeStats) Tp() int { + return TpRURuntimeStats +} diff --git a/pkg/util/execdetails/tiflash_stats.go b/pkg/util/execdetails/tiflash_stats.go new file mode 100644 index 0000000000..5bd06125f3 --- /dev/null +++ b/pkg/util/execdetails/tiflash_stats.go @@ -0,0 +1,742 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package execdetails + +import ( + "bytes" + "fmt" + "maps" + "math" + "strconv" + "strings" + "time" + + "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/tipb/go-tipb" + "github.com/tikv/client-go/v2/util" +) + +// TiflashStats contains tiflash execution stats. +type TiflashStats struct { + scanContext TiFlashScanContext + waitSummary TiFlashWaitSummary + networkSummary TiFlashNetworkTrafficSummary +} + +// TiFlashScanContext is used to express the table scan information in tiflash +type TiFlashScanContext struct { + dmfileDataScannedRows uint64 + dmfileDataSkippedRows uint64 + dmfileMvccScannedRows uint64 + dmfileMvccSkippedRows uint64 + dmfileLmFilterScannedRows uint64 + dmfileLmFilterSkippedRows uint64 + totalDmfileRsCheckMs uint64 + totalDmfileReadMs uint64 + totalBuildSnapshotMs uint64 + localRegions uint64 + remoteRegions uint64 + totalLearnerReadMs uint64 + disaggReadCacheHitBytes uint64 + disaggReadCacheMissBytes uint64 + segments uint64 + readTasks uint64 + deltaRows uint64 + deltaBytes uint64 + mvccInputRows uint64 + mvccInputBytes uint64 + mvccOutputRows uint64 + totalBuildBitmapMs uint64 + totalBuildInputStreamMs uint64 + staleReadRegions uint64 + minLocalStreamMs uint64 + maxLocalStreamMs uint64 + minRemoteStreamMs uint64 + maxRemoteStreamMs uint64 + regionsOfInstance map[string]uint64 + + // vector index related + + vectorIdxLoadFromS3 uint64 + vectorIdxLoadFromDisk uint64 + vectorIdxLoadFromCache uint64 + vectorIdxLoadTimeMs uint64 + vectorIdxSearchTimeMs uint64 + vectorIdxSearchVisitedNodes uint64 + vectorIdxSearchDiscardedNodes uint64 + vectorIdxReadVecTimeMs uint64 + vectorIdxReadOthersTimeMs uint64 + + // fts related + + ftsNFromInmemoryNoindex uint32 + ftsNFromTinyIndex uint32 + ftsNFromTinyNoindex uint32 + ftsNFromDmfIndex uint32 + ftsNFromDmfNoindex uint32 + ftsRowsFromInmemoryNoindex uint64 + ftsRowsFromTinyIndex uint64 + ftsRowsFromTinyNoindex uint64 + ftsRowsFromDmfIndex uint64 + ftsRowsFromDmfNoindex uint64 + ftsIdxLoadTotalMs uint64 + ftsIdxLoadFromCache uint32 + ftsIdxLoadFromColumnFile uint32 + ftsIdxLoadFromStableS3 uint32 + ftsIdxLoadFromStableDisk uint32 + ftsIdxSearchN uint32 + ftsIdxSearchTotalMs uint64 + ftsIdxDmSearchRows uint64 + ftsIdxDmTotalReadFtsMs uint64 + ftsIdxDmTotalReadOthersMs uint64 + ftsIdxTinySearchRows uint64 + ftsIdxTinyTotalReadFtsMs uint64 + ftsIdxTinyTotalReadOthersMs uint64 + ftsBruteTotalReadMs uint64 + ftsBruteTotalSearchMs uint64 + + // inverted index related + + invertedIdxLoadFromS3 uint32 + invertedIdxLoadFromDisk uint32 + invertedIdxLoadFromCache uint32 + invertedIdxLoadTimeMs uint64 + invertedIdxSearchTimeMs uint64 + invertedIdxSearchSkippedPacks uint32 + invertedIdxIndexedRows uint64 + invertedIdxSearchSelectedRows uint64 +} + +// Clone implements the deep copy of * TiFlashshScanContext +func (context *TiFlashScanContext) Clone() TiFlashScanContext { + newContext := TiFlashScanContext{ + dmfileDataScannedRows: context.dmfileDataScannedRows, + dmfileDataSkippedRows: context.dmfileDataSkippedRows, + dmfileMvccScannedRows: context.dmfileMvccScannedRows, + dmfileMvccSkippedRows: context.dmfileMvccSkippedRows, + dmfileLmFilterScannedRows: context.dmfileLmFilterScannedRows, + dmfileLmFilterSkippedRows: context.dmfileLmFilterSkippedRows, + totalDmfileRsCheckMs: context.totalDmfileRsCheckMs, + totalDmfileReadMs: context.totalDmfileReadMs, + totalBuildSnapshotMs: context.totalBuildSnapshotMs, + localRegions: context.localRegions, + remoteRegions: context.remoteRegions, + totalLearnerReadMs: context.totalLearnerReadMs, + disaggReadCacheHitBytes: context.disaggReadCacheHitBytes, + disaggReadCacheMissBytes: context.disaggReadCacheMissBytes, + segments: context.segments, + readTasks: context.readTasks, + deltaRows: context.deltaRows, + deltaBytes: context.deltaBytes, + mvccInputRows: context.mvccInputRows, + mvccInputBytes: context.mvccInputBytes, + mvccOutputRows: context.mvccOutputRows, + totalBuildBitmapMs: context.totalBuildBitmapMs, + totalBuildInputStreamMs: context.totalBuildInputStreamMs, + staleReadRegions: context.staleReadRegions, + minLocalStreamMs: context.minLocalStreamMs, + maxLocalStreamMs: context.maxLocalStreamMs, + minRemoteStreamMs: context.minRemoteStreamMs, + maxRemoteStreamMs: context.maxRemoteStreamMs, + regionsOfInstance: make(map[string]uint64), + + vectorIdxLoadFromS3: context.vectorIdxLoadFromS3, + vectorIdxLoadFromDisk: context.vectorIdxLoadFromDisk, + vectorIdxLoadFromCache: context.vectorIdxLoadFromCache, + vectorIdxLoadTimeMs: context.vectorIdxLoadTimeMs, + vectorIdxSearchTimeMs: context.vectorIdxSearchTimeMs, + vectorIdxSearchVisitedNodes: context.vectorIdxSearchVisitedNodes, + vectorIdxSearchDiscardedNodes: context.vectorIdxSearchDiscardedNodes, + vectorIdxReadVecTimeMs: context.vectorIdxReadVecTimeMs, + vectorIdxReadOthersTimeMs: context.vectorIdxReadOthersTimeMs, + + ftsNFromInmemoryNoindex: context.ftsNFromInmemoryNoindex, + ftsNFromTinyIndex: context.ftsNFromTinyIndex, + ftsNFromTinyNoindex: context.ftsNFromTinyNoindex, + ftsNFromDmfIndex: context.ftsNFromDmfIndex, + ftsNFromDmfNoindex: context.ftsNFromDmfNoindex, + ftsRowsFromInmemoryNoindex: context.ftsRowsFromInmemoryNoindex, + ftsRowsFromTinyIndex: context.ftsRowsFromTinyIndex, + ftsRowsFromTinyNoindex: context.ftsRowsFromTinyNoindex, + ftsRowsFromDmfIndex: context.ftsRowsFromDmfIndex, + ftsRowsFromDmfNoindex: context.ftsRowsFromDmfNoindex, + ftsIdxLoadTotalMs: context.ftsIdxLoadTotalMs, + ftsIdxLoadFromCache: context.ftsIdxLoadFromCache, + ftsIdxLoadFromColumnFile: context.ftsIdxLoadFromColumnFile, + ftsIdxLoadFromStableS3: context.ftsIdxLoadFromStableS3, + ftsIdxLoadFromStableDisk: context.ftsIdxLoadFromStableDisk, + ftsIdxSearchN: context.ftsIdxSearchN, + ftsIdxSearchTotalMs: context.ftsIdxSearchTotalMs, + ftsIdxDmSearchRows: context.ftsIdxDmSearchRows, + ftsIdxDmTotalReadFtsMs: context.ftsIdxDmTotalReadFtsMs, + ftsIdxDmTotalReadOthersMs: context.ftsIdxDmTotalReadOthersMs, + ftsIdxTinySearchRows: context.ftsIdxTinySearchRows, + ftsIdxTinyTotalReadFtsMs: context.ftsIdxTinyTotalReadFtsMs, + ftsIdxTinyTotalReadOthersMs: context.ftsIdxTinyTotalReadOthersMs, + ftsBruteTotalReadMs: context.ftsBruteTotalReadMs, + ftsBruteTotalSearchMs: context.ftsBruteTotalSearchMs, + + invertedIdxLoadFromS3: context.invertedIdxLoadFromS3, + invertedIdxLoadFromDisk: context.invertedIdxLoadFromDisk, + invertedIdxLoadFromCache: context.invertedIdxLoadFromCache, + invertedIdxLoadTimeMs: context.invertedIdxLoadTimeMs, + invertedIdxSearchTimeMs: context.invertedIdxSearchTimeMs, + invertedIdxSearchSkippedPacks: context.invertedIdxSearchSkippedPacks, + invertedIdxIndexedRows: context.invertedIdxIndexedRows, + invertedIdxSearchSelectedRows: context.invertedIdxSearchSelectedRows, + } + maps.Copy(newContext.regionsOfInstance, context.regionsOfInstance) + return newContext +} + +func (context *TiFlashScanContext) String() string { + var output []string + if context.vectorIdxLoadFromS3+context.vectorIdxLoadFromDisk+context.vectorIdxLoadFromCache > 0 { + var items []string + items = append(items, fmt.Sprintf("load:{total:%dms,from_s3:%d,from_disk:%d,from_cache:%d}", context.vectorIdxLoadTimeMs, context.vectorIdxLoadFromS3, context.vectorIdxLoadFromDisk, context.vectorIdxLoadFromCache)) + items = append(items, fmt.Sprintf("search:{total:%dms,visited_nodes:%d,discarded_nodes:%d}", context.vectorIdxSearchTimeMs, context.vectorIdxSearchVisitedNodes, context.vectorIdxSearchDiscardedNodes)) + items = append(items, fmt.Sprintf("read:{vec_total:%dms,others_total:%dms}", context.vectorIdxReadVecTimeMs, context.vectorIdxReadOthersTimeMs)) + output = append(output, "vector_idx:{"+strings.Join(items, ",")+"}") + } + if context.invertedIdxLoadFromS3+context.invertedIdxLoadFromDisk+context.invertedIdxLoadFromCache > 0 { + var items []string + items = append(items, fmt.Sprintf("load:{total:%dms,from_s3:%d,from_disk:%d,from_cache:%d}", context.invertedIdxLoadTimeMs, context.invertedIdxLoadFromS3, context.invertedIdxLoadFromDisk, context.invertedIdxLoadFromCache)) + items = append(items, fmt.Sprintf("search:{total:%dms,skipped_packs:%d,indexed_rows:%d,selected_rows:%d}", context.invertedIdxSearchTimeMs, context.invertedIdxSearchSkippedPacks, context.invertedIdxIndexedRows, context.invertedIdxSearchSelectedRows)) + output = append(output, "inverted_idx:{"+strings.Join(items, ",")+"}") + } + if context.ftsNFromInmemoryNoindex+context.ftsNFromTinyIndex+context.ftsNFromTinyNoindex+context.ftsNFromDmfIndex+context.ftsNFromDmfNoindex > 0 { + var items []string + items = append(items, fmt.Sprintf("hit_rows:{delta:%d,dmf:%d}", context.ftsRowsFromTinyIndex, context.ftsRowsFromDmfIndex)) + items = append(items, fmt.Sprintf("miss_rows:{mem:%d,delta:%d,dmf:%d}", context.ftsRowsFromInmemoryNoindex, context.ftsRowsFromTinyNoindex, context.ftsRowsFromDmfNoindex)) + items = append(items, fmt.Sprintf("idx_load:{total:%dms,from:{s3:%d,disk:%d,cache:%d}}", context.ftsIdxLoadTotalMs, context.ftsIdxLoadFromStableS3, context.ftsIdxLoadFromStableDisk+context.ftsIdxLoadFromColumnFile, context.ftsIdxLoadFromCache)) + avg := uint64(0) + if context.ftsIdxSearchN > 0 { + avg = context.ftsIdxSearchTotalMs / uint64(context.ftsIdxSearchN) + } + items = append(items, fmt.Sprintf("idx_search:{total:%dms,avg:%dms}", context.ftsIdxSearchTotalMs, avg)) + items = append(items, fmt.Sprintf("idx_read:{rows:%d,fts_total:%dms,others_total:%dms}", context.ftsIdxDmSearchRows+context.ftsIdxTinySearchRows, context.ftsIdxDmTotalReadFtsMs+context.ftsIdxTinyTotalReadFtsMs, context.ftsIdxDmTotalReadOthersMs+context.ftsIdxTinyTotalReadOthersMs)) + items = append(items, fmt.Sprintf("miss:{read:%dms,search:%dms}", context.ftsBruteTotalReadMs, context.ftsBruteTotalSearchMs)) + output = append(output, "fts:{"+strings.Join(items, ",")+"}") + } + + regionBalanceInfo := "none" + if len(context.regionsOfInstance) > 0 { + maxNum := uint64(0) + minNum := uint64(math.MaxUint64) + for _, v := range context.regionsOfInstance { + if v > maxNum { + maxNum = v + } + if v > 0 && v < minNum { + minNum = v + } + } + regionBalanceInfo = fmt.Sprintf("{instance_num: %d, max/min: %d/%d=%f}", + len(context.regionsOfInstance), + maxNum, + minNum, + float64(maxNum)/float64(minNum)) + } + dmfileDisaggInfo := "" + if context.disaggReadCacheHitBytes != 0 || context.disaggReadCacheMissBytes != 0 { + dmfileDisaggInfo = fmt.Sprintf(", disagg_cache_hit_bytes: %d, disagg_cache_miss_bytes: %d", + context.disaggReadCacheHitBytes, + context.disaggReadCacheMissBytes) + } + remoteStreamInfo := "" + if context.minRemoteStreamMs != 0 || context.maxRemoteStreamMs != 0 { + remoteStreamInfo = fmt.Sprintf("min_remote_stream:%dms, max_remote_stream:%dms, ", context.minRemoteStreamMs, context.maxRemoteStreamMs) + } + + // note: "tot" is short for "total" + output = append(output, fmt.Sprintf("tiflash_scan:{"+ + "mvcc_input_rows:%d, "+ + "mvcc_input_bytes:%d, "+ + "mvcc_output_rows:%d, "+ + "local_regions:%d, "+ + "remote_regions:%d, "+ + "tot_learner_read:%dms, "+ + "region_balance:%s, "+ + "delta_rows:%d, "+ + "delta_bytes:%d, "+ + "segments:%d, "+ + "stale_read_regions:%d, "+ + "tot_build_snapshot:%dms, "+ + "tot_build_bitmap:%dms, "+ + "tot_build_inputstream:%dms, "+ + "min_local_stream:%dms, "+ + "max_local_stream:%dms, "+ + "%s"+ // remote stream info + "dtfile:{"+ + "data_scanned_rows:%d, "+ + "data_skipped_rows:%d, "+ + "mvcc_scanned_rows:%d, "+ + "mvcc_skipped_rows:%d, "+ + "lm_filter_scanned_rows:%d, "+ + "lm_filter_skipped_rows:%d, "+ + "tot_rs_index_check:%dms, "+ + "tot_read:%dms"+ + "%s}"+ // Disagg cache info of DMFile + "}", + context.mvccInputRows, + context.mvccInputBytes, + context.mvccOutputRows, + context.localRegions, + context.remoteRegions, + context.totalLearnerReadMs, + regionBalanceInfo, + context.deltaRows, + context.deltaBytes, + context.segments, + context.staleReadRegions, + context.totalBuildSnapshotMs, + context.totalBuildBitmapMs, + context.totalBuildInputStreamMs, + context.minLocalStreamMs, + context.maxLocalStreamMs, + remoteStreamInfo, + context.dmfileDataScannedRows, + context.dmfileDataSkippedRows, + context.dmfileMvccScannedRows, + context.dmfileMvccSkippedRows, + context.dmfileLmFilterScannedRows, + context.dmfileLmFilterSkippedRows, + context.totalDmfileRsCheckMs, + context.totalDmfileReadMs, + dmfileDisaggInfo, + )) + + return strings.Join(output, ", ") +} + +// Merge make sum to merge the information in TiFlashScanContext +func (context *TiFlashScanContext) Merge(other TiFlashScanContext) { + context.dmfileDataScannedRows += other.dmfileDataScannedRows + context.dmfileDataSkippedRows += other.dmfileDataSkippedRows + context.dmfileMvccScannedRows += other.dmfileMvccScannedRows + context.dmfileMvccSkippedRows += other.dmfileMvccSkippedRows + context.dmfileLmFilterScannedRows += other.dmfileLmFilterScannedRows + context.dmfileLmFilterSkippedRows += other.dmfileLmFilterSkippedRows + context.totalDmfileRsCheckMs += other.totalDmfileRsCheckMs + context.totalDmfileReadMs += other.totalDmfileReadMs + context.totalBuildSnapshotMs += other.totalBuildSnapshotMs + context.localRegions += other.localRegions + context.remoteRegions += other.remoteRegions + context.totalLearnerReadMs += other.totalLearnerReadMs + context.disaggReadCacheHitBytes += other.disaggReadCacheHitBytes + context.disaggReadCacheMissBytes += other.disaggReadCacheMissBytes + context.segments += other.segments + context.readTasks += other.readTasks + context.deltaRows += other.deltaRows + context.deltaBytes += other.deltaBytes + context.mvccInputRows += other.mvccInputRows + context.mvccInputBytes += other.mvccInputBytes + context.mvccOutputRows += other.mvccOutputRows + context.totalBuildBitmapMs += other.totalBuildBitmapMs + context.totalBuildInputStreamMs += other.totalBuildInputStreamMs + context.staleReadRegions += other.staleReadRegions + + context.vectorIdxLoadFromS3 += other.vectorIdxLoadFromS3 + context.vectorIdxLoadFromDisk += other.vectorIdxLoadFromDisk + context.vectorIdxLoadFromCache += other.vectorIdxLoadFromCache + context.vectorIdxLoadTimeMs += other.vectorIdxLoadTimeMs + context.vectorIdxSearchTimeMs += other.vectorIdxSearchTimeMs + context.vectorIdxSearchVisitedNodes += other.vectorIdxSearchVisitedNodes + context.vectorIdxSearchDiscardedNodes += other.vectorIdxSearchDiscardedNodes + context.vectorIdxReadVecTimeMs += other.vectorIdxReadVecTimeMs + context.vectorIdxReadOthersTimeMs += other.vectorIdxReadOthersTimeMs + + context.ftsNFromInmemoryNoindex += other.ftsNFromInmemoryNoindex + context.ftsNFromTinyIndex += other.ftsNFromTinyIndex + context.ftsNFromTinyNoindex += other.ftsNFromTinyNoindex + context.ftsNFromDmfIndex += other.ftsNFromDmfIndex + context.ftsNFromDmfNoindex += other.ftsNFromDmfNoindex + context.ftsRowsFromInmemoryNoindex += other.ftsRowsFromInmemoryNoindex + context.ftsRowsFromTinyIndex += other.ftsRowsFromTinyIndex + context.ftsRowsFromTinyNoindex += other.ftsRowsFromTinyNoindex + context.ftsRowsFromDmfIndex += other.ftsRowsFromDmfIndex + context.ftsRowsFromDmfNoindex += other.ftsRowsFromDmfNoindex + context.ftsIdxLoadTotalMs += other.ftsIdxLoadTotalMs + context.ftsIdxLoadFromCache += other.ftsIdxLoadFromCache + context.ftsIdxLoadFromColumnFile += other.ftsIdxLoadFromColumnFile + context.ftsIdxLoadFromStableS3 += other.ftsIdxLoadFromStableS3 + context.ftsIdxLoadFromStableDisk += other.ftsIdxLoadFromStableDisk + context.ftsIdxSearchN += other.ftsIdxSearchN + context.ftsIdxSearchTotalMs += other.ftsIdxSearchTotalMs + context.ftsIdxDmSearchRows += other.ftsIdxDmSearchRows + context.ftsIdxDmTotalReadFtsMs += other.ftsIdxDmTotalReadFtsMs + context.ftsIdxDmTotalReadOthersMs += other.ftsIdxDmTotalReadOthersMs + context.ftsIdxTinySearchRows += other.ftsIdxTinySearchRows + context.ftsIdxTinyTotalReadFtsMs += other.ftsIdxTinyTotalReadFtsMs + context.ftsIdxTinyTotalReadOthersMs += other.ftsIdxTinyTotalReadOthersMs + context.ftsBruteTotalReadMs += other.ftsBruteTotalReadMs + context.ftsBruteTotalSearchMs += other.ftsBruteTotalSearchMs + + context.invertedIdxLoadFromS3 += other.invertedIdxLoadFromS3 + context.invertedIdxLoadFromDisk += other.invertedIdxLoadFromDisk + context.invertedIdxLoadFromCache += other.invertedIdxLoadFromCache + context.invertedIdxLoadTimeMs += other.invertedIdxLoadTimeMs + context.invertedIdxSearchTimeMs += other.invertedIdxSearchTimeMs + context.invertedIdxSearchSkippedPacks += other.invertedIdxSearchSkippedPacks + context.invertedIdxIndexedRows += other.invertedIdxIndexedRows + context.invertedIdxSearchSelectedRows += other.invertedIdxSearchSelectedRows + + if context.minLocalStreamMs == 0 || other.minLocalStreamMs < context.minLocalStreamMs { + context.minLocalStreamMs = other.minLocalStreamMs + } + if other.maxLocalStreamMs > context.maxLocalStreamMs { + context.maxLocalStreamMs = other.maxLocalStreamMs + } + if context.minRemoteStreamMs == 0 || other.minRemoteStreamMs < context.minRemoteStreamMs { + context.minRemoteStreamMs = other.minRemoteStreamMs + } + if other.maxRemoteStreamMs > context.maxRemoteStreamMs { + context.maxRemoteStreamMs = other.maxRemoteStreamMs + } + + if context.regionsOfInstance == nil { + context.regionsOfInstance = make(map[string]uint64) + } + for k, v := range other.regionsOfInstance { + context.regionsOfInstance[k] += v + } +} + +func (context *TiFlashScanContext) mergeExecSummary(summary *tipb.TiFlashScanContext) { + if summary == nil { + return + } + context.dmfileDataScannedRows += summary.GetDmfileDataScannedRows() + context.dmfileDataSkippedRows += summary.GetDmfileDataSkippedRows() + context.dmfileMvccScannedRows += summary.GetDmfileMvccScannedRows() + context.dmfileMvccSkippedRows += summary.GetDmfileMvccSkippedRows() + context.dmfileLmFilterScannedRows += summary.GetDmfileLmFilterScannedRows() + context.dmfileLmFilterSkippedRows += summary.GetDmfileLmFilterSkippedRows() + context.totalDmfileRsCheckMs += summary.GetTotalDmfileRsCheckMs() + context.totalDmfileReadMs += summary.GetTotalDmfileReadMs() + context.totalBuildSnapshotMs += summary.GetTotalBuildSnapshotMs() + context.localRegions += summary.GetLocalRegions() + context.remoteRegions += summary.GetRemoteRegions() + context.totalLearnerReadMs += summary.GetTotalLearnerReadMs() + context.disaggReadCacheHitBytes += summary.GetDisaggReadCacheHitBytes() + context.disaggReadCacheMissBytes += summary.GetDisaggReadCacheMissBytes() + context.segments += summary.GetSegments() + context.readTasks += summary.GetReadTasks() + context.deltaRows += summary.GetDeltaRows() + context.deltaBytes += summary.GetDeltaBytes() + context.mvccInputRows += summary.GetMvccInputRows() + context.mvccInputBytes += summary.GetMvccInputBytes() + context.mvccOutputRows += summary.GetMvccOutputRows() + context.totalBuildBitmapMs += summary.GetTotalBuildBitmapMs() + context.totalBuildInputStreamMs += summary.GetTotalBuildInputstreamMs() + context.staleReadRegions += summary.GetStaleReadRegions() + + context.vectorIdxLoadFromS3 += summary.GetVectorIdxLoadFromS3() + context.vectorIdxLoadFromDisk += summary.GetVectorIdxLoadFromDisk() + context.vectorIdxLoadFromCache += summary.GetVectorIdxLoadFromCache() + context.vectorIdxLoadTimeMs += summary.GetVectorIdxLoadTimeMs() + context.vectorIdxSearchTimeMs += summary.GetVectorIdxSearchTimeMs() + context.vectorIdxSearchVisitedNodes += summary.GetVectorIdxSearchVisitedNodes() + context.vectorIdxSearchDiscardedNodes += summary.GetVectorIdxSearchDiscardedNodes() + context.vectorIdxReadVecTimeMs += summary.GetVectorIdxReadVecTimeMs() + context.vectorIdxReadOthersTimeMs += summary.GetVectorIdxReadOthersTimeMs() + + context.ftsNFromInmemoryNoindex += summary.GetFtsNFromInmemoryNoindex() + context.ftsNFromTinyIndex += summary.GetFtsNFromTinyIndex() + context.ftsNFromTinyNoindex += summary.GetFtsNFromTinyNoindex() + context.ftsNFromDmfIndex += summary.GetFtsNFromDmfIndex() + context.ftsNFromDmfNoindex += summary.GetFtsNFromDmfNoindex() + context.ftsRowsFromInmemoryNoindex += summary.GetFtsRowsFromInmemoryNoindex() + context.ftsRowsFromTinyIndex += summary.GetFtsRowsFromTinyIndex() + context.ftsRowsFromTinyNoindex += summary.GetFtsRowsFromTinyNoindex() + context.ftsRowsFromDmfIndex += summary.GetFtsRowsFromDmfIndex() + context.ftsRowsFromDmfNoindex += summary.GetFtsRowsFromDmfNoindex() + context.ftsIdxLoadTotalMs += summary.GetFtsIdxLoadTotalMs() + context.ftsIdxLoadFromCache += summary.GetFtsIdxLoadFromCache() + context.ftsIdxLoadFromColumnFile += summary.GetFtsIdxLoadFromColumnFile() + context.ftsIdxLoadFromStableS3 += summary.GetFtsIdxLoadFromStableS3() + context.ftsIdxLoadFromStableDisk += summary.GetFtsIdxLoadFromStableDisk() + context.ftsIdxSearchN += summary.GetFtsIdxSearchN() + context.ftsIdxSearchTotalMs += summary.GetFtsIdxSearchTotalMs() + context.ftsIdxDmSearchRows += summary.GetFtsIdxDmSearchRows() + context.ftsIdxDmTotalReadFtsMs += summary.GetFtsIdxDmTotalReadFtsMs() + context.ftsIdxDmTotalReadOthersMs += summary.GetFtsIdxDmTotalReadOthersMs() + context.ftsIdxTinySearchRows += summary.GetFtsIdxTinySearchRows() + context.ftsIdxTinyTotalReadFtsMs += summary.GetFtsIdxTinyTotalReadFtsMs() + context.ftsIdxTinyTotalReadOthersMs += summary.GetFtsIdxTinyTotalReadOthersMs() + context.ftsBruteTotalReadMs += summary.GetFtsBruteTotalReadMs() + context.ftsBruteTotalSearchMs += summary.GetFtsBruteTotalSearchMs() + + context.invertedIdxLoadFromS3 += summary.GetInvertedIdxLoadFromS3() + context.invertedIdxLoadFromDisk += summary.GetInvertedIdxLoadFromDisk() + context.invertedIdxLoadFromCache += summary.GetInvertedIdxLoadFromCache() + context.invertedIdxLoadTimeMs += summary.GetInvertedIdxLoadTimeMs() + context.invertedIdxSearchTimeMs += summary.GetInvertedIdxSearchTimeMs() + context.invertedIdxSearchSkippedPacks += summary.GetInvertedIdxSearchSkippedPacks() + context.invertedIdxIndexedRows += summary.GetInvertedIdxIndexedRows() + context.invertedIdxSearchSelectedRows += summary.GetInvertedIdxSearchSelectedRows() + + if context.minLocalStreamMs == 0 || summary.GetMinLocalStreamMs() < context.minLocalStreamMs { + context.minLocalStreamMs = summary.GetMinLocalStreamMs() + } + if summary.GetMaxLocalStreamMs() > context.maxLocalStreamMs { + context.maxLocalStreamMs = summary.GetMaxLocalStreamMs() + } + if context.minRemoteStreamMs == 0 || summary.GetMinRemoteStreamMs() < context.minRemoteStreamMs { + context.minRemoteStreamMs = summary.GetMinRemoteStreamMs() + } + if summary.GetMaxRemoteStreamMs() > context.maxRemoteStreamMs { + context.maxRemoteStreamMs = summary.GetMaxRemoteStreamMs() + } + + if context.regionsOfInstance == nil { + context.regionsOfInstance = make(map[string]uint64, len(summary.GetRegionsOfInstance())) + } + for _, instance := range summary.GetRegionsOfInstance() { + context.regionsOfInstance[instance.GetInstanceId()] += instance.GetRegionNum() + } +} + +// Empty check whether TiFlashScanContext is Empty, if scan no pack and skip no pack, we regard it as empty +func (context *TiFlashScanContext) Empty() bool { + res := context.dmfileDataScannedRows == 0 && + context.dmfileDataSkippedRows == 0 && + context.dmfileMvccScannedRows == 0 && + context.dmfileMvccSkippedRows == 0 && + context.dmfileLmFilterScannedRows == 0 && + context.dmfileLmFilterSkippedRows == 0 && + context.localRegions == 0 && + context.remoteRegions == 0 && + context.vectorIdxLoadFromDisk == 0 && + context.vectorIdxLoadFromCache == 0 && + context.vectorIdxLoadFromS3 == 0 && + context.invertedIdxLoadFromDisk == 0 && + context.invertedIdxLoadFromCache == 0 && + context.invertedIdxLoadFromS3 == 0 && + context.ftsNFromInmemoryNoindex == 0 && + context.ftsNFromTinyIndex == 0 && + context.ftsNFromTinyNoindex == 0 && + context.ftsNFromDmfIndex == 0 && + context.ftsNFromDmfNoindex == 0 + return res +} + +// TiFlashWaitSummary is used to express all kinds of wait information in tiflash +type TiFlashWaitSummary struct { + // keep execution time to do merge work, always record the wait time with largest execution time + executionTime uint64 + minTSOWaitTime uint64 + pipelineBreakerWaitTime uint64 + pipelineQueueWaitTime uint64 +} + +// Clone implements the deep copy of * TiFlashWaitSummary +func (waitSummary *TiFlashWaitSummary) Clone() TiFlashWaitSummary { + newSummary := TiFlashWaitSummary{ + executionTime: waitSummary.executionTime, + minTSOWaitTime: waitSummary.minTSOWaitTime, + pipelineBreakerWaitTime: waitSummary.pipelineBreakerWaitTime, + pipelineQueueWaitTime: waitSummary.pipelineQueueWaitTime, + } + return newSummary +} + +// String dumps TiFlashWaitSummary info as string +func (waitSummary *TiFlashWaitSummary) String() string { + if waitSummary.CanBeIgnored() { + return "" + } + buf := bytes.NewBuffer(make([]byte, 0, 32)) + buf.WriteString("tiflash_wait: {") + empty := true + if waitSummary.minTSOWaitTime >= uint64(time.Millisecond) { + buf.WriteString("minTSO_wait: ") + buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.minTSOWaitTime).Milliseconds(), 10)) + buf.WriteString("ms") + empty = false + } + if waitSummary.pipelineBreakerWaitTime >= uint64(time.Millisecond) { + if !empty { + buf.WriteString(", ") + } + buf.WriteString("pipeline_breaker_wait: ") + buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.pipelineBreakerWaitTime).Milliseconds(), 10)) + buf.WriteString("ms") + empty = false + } + if waitSummary.pipelineQueueWaitTime >= uint64(time.Millisecond) { + if !empty { + buf.WriteString(", ") + } + buf.WriteString("pipeline_queue_wait: ") + buf.WriteString(strconv.FormatInt(time.Duration(waitSummary.pipelineQueueWaitTime).Milliseconds(), 10)) + buf.WriteString("ms") + } + buf.WriteString("}") + return buf.String() +} + +// Merge make sum to merge the information in TiFlashWaitSummary +func (waitSummary *TiFlashWaitSummary) Merge(other TiFlashWaitSummary) { + if waitSummary.executionTime < other.executionTime { + waitSummary.executionTime = other.executionTime + waitSummary.minTSOWaitTime = other.minTSOWaitTime + waitSummary.pipelineBreakerWaitTime = other.pipelineBreakerWaitTime + waitSummary.pipelineQueueWaitTime = other.pipelineQueueWaitTime + } +} + +func (waitSummary *TiFlashWaitSummary) mergeExecSummary(summary *tipb.TiFlashWaitSummary, executionTime uint64) { + if summary == nil { + return + } + if waitSummary.executionTime < executionTime { + waitSummary.executionTime = executionTime + waitSummary.minTSOWaitTime = summary.GetMinTSOWaitNs() + waitSummary.pipelineBreakerWaitTime = summary.GetPipelineBreakerWaitNs() + waitSummary.pipelineQueueWaitTime = summary.GetPipelineQueueWaitNs() + } +} + +// CanBeIgnored check whether TiFlashWaitSummary can be ignored, not all tidb executors have significant tiflash wait summary +func (waitSummary *TiFlashWaitSummary) CanBeIgnored() bool { + res := waitSummary.minTSOWaitTime < uint64(time.Millisecond) && + waitSummary.pipelineBreakerWaitTime < uint64(time.Millisecond) && + waitSummary.pipelineQueueWaitTime < uint64(time.Millisecond) + return res +} + +// TiFlashNetworkTrafficSummary is used to express network traffic in tiflash +type TiFlashNetworkTrafficSummary struct { + innerZoneSendBytes uint64 + interZoneSendBytes uint64 + innerZoneReceiveBytes uint64 + interZoneReceiveBytes uint64 +} + +// UpdateTiKVExecDetails update tikvDetails with TiFlashNetworkTrafficSummary's values +func (networkTraffic *TiFlashNetworkTrafficSummary) UpdateTiKVExecDetails(tikvDetails *util.ExecDetails) { + if tikvDetails == nil { + return + } + tikvDetails.UnpackedBytesSentMPPCrossZone += int64(networkTraffic.interZoneSendBytes) + tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.interZoneSendBytes) + tikvDetails.UnpackedBytesSentMPPTotal += int64(networkTraffic.innerZoneSendBytes) + + tikvDetails.UnpackedBytesReceivedMPPCrossZone += int64(networkTraffic.interZoneReceiveBytes) + tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.interZoneReceiveBytes) + tikvDetails.UnpackedBytesReceivedMPPTotal += int64(networkTraffic.innerZoneReceiveBytes) +} + +// Clone implements the deep copy of * TiFlashNetworkTrafficSummary +func (networkTraffic *TiFlashNetworkTrafficSummary) Clone() TiFlashNetworkTrafficSummary { + newSummary := TiFlashNetworkTrafficSummary{ + innerZoneSendBytes: networkTraffic.innerZoneSendBytes, + interZoneSendBytes: networkTraffic.interZoneSendBytes, + innerZoneReceiveBytes: networkTraffic.innerZoneReceiveBytes, + interZoneReceiveBytes: networkTraffic.interZoneReceiveBytes, + } + return newSummary +} + +// Empty check whether TiFlashNetworkTrafficSummary is Empty, if no any network traffic, we regard it as empty +func (networkTraffic *TiFlashNetworkTrafficSummary) Empty() bool { + res := networkTraffic.innerZoneSendBytes == 0 && + networkTraffic.interZoneSendBytes == 0 && + networkTraffic.innerZoneReceiveBytes == 0 && + networkTraffic.interZoneReceiveBytes == 0 + return res +} + +// String dumps TiFlashNetworkTrafficSummary info as string +func (networkTraffic *TiFlashNetworkTrafficSummary) String() string { + buf := bytes.NewBuffer(make([]byte, 0, 32)) + buf.WriteString("tiflash_network: {") + empty := true + if networkTraffic.innerZoneSendBytes != 0 { + buf.WriteString("inner_zone_send_bytes: ") + buf.WriteString(strconv.FormatInt(int64(networkTraffic.innerZoneSendBytes), 10)) + empty = false + } + if networkTraffic.interZoneSendBytes != 0 { + if !empty { + buf.WriteString(", ") + } + buf.WriteString("inter_zone_send_bytes: ") + buf.WriteString(strconv.FormatInt(int64(networkTraffic.interZoneSendBytes), 10)) + empty = false + } + if networkTraffic.innerZoneReceiveBytes != 0 { + if !empty { + buf.WriteString(", ") + } + buf.WriteString("inner_zone_receive_bytes: ") + buf.WriteString(strconv.FormatInt(int64(networkTraffic.innerZoneReceiveBytes), 10)) + empty = false + } + if networkTraffic.interZoneReceiveBytes != 0 { + if !empty { + buf.WriteString(", ") + } + buf.WriteString("inter_zone_receive_bytes: ") + buf.WriteString(strconv.FormatInt(int64(networkTraffic.interZoneReceiveBytes), 10)) + } + buf.WriteString("}") + return buf.String() +} + +// Merge make sum to merge the information in TiFlashNetworkTrafficSummary +func (networkTraffic *TiFlashNetworkTrafficSummary) Merge(other TiFlashNetworkTrafficSummary) { + networkTraffic.innerZoneSendBytes += other.innerZoneSendBytes + networkTraffic.interZoneSendBytes += other.interZoneSendBytes + networkTraffic.innerZoneReceiveBytes += other.innerZoneReceiveBytes + networkTraffic.interZoneReceiveBytes += other.interZoneReceiveBytes +} + +func (networkTraffic *TiFlashNetworkTrafficSummary) mergeExecSummary(summary *tipb.TiFlashNetWorkSummary) { + if summary == nil { + return + } + networkTraffic.innerZoneSendBytes += *summary.InnerZoneSendBytes + networkTraffic.interZoneSendBytes += *summary.InterZoneSendBytes + networkTraffic.innerZoneReceiveBytes += *summary.InnerZoneReceiveBytes + networkTraffic.interZoneReceiveBytes += *summary.InterZoneReceiveBytes +} + +// GetInterZoneTrafficBytes returns the inter zone network traffic bytes involved +// between tiflash instances. +func (networkTraffic *TiFlashNetworkTrafficSummary) GetInterZoneTrafficBytes() uint64 { + // NOTE: we only count the inter zone sent bytes here because tiflash count the traffic bytes + // of all sub request. For each sub request, both side with count the send and recv traffic. + // So here, we only use the send bytes as the overall traffic to avoid count the traffic twice. + // While this statistics logic seems a bit weird to me, but this is the tiflash side desicion. + return networkTraffic.interZoneSendBytes +} + +// MergeTiFlashRUConsumption merge execution summaries from selectResponse into ruDetails. +func MergeTiFlashRUConsumption(executionSummaries []*tipb.ExecutorExecutionSummary, ruDetails *util.RUDetails) error { + newRUDetails := util.NewRUDetails() + for _, summary := range executionSummaries { + if summary != nil && summary.GetRuConsumption() != nil { + tiflashRU := new(resource_manager.Consumption) + if err := tiflashRU.Unmarshal(summary.GetRuConsumption()); err != nil { + return err + } + newRUDetails.Update(tiflashRU, 0) + } + } + ruDetails.Merge(newRUDetails) + return nil +} diff --git a/pkg/util/execdetails/util.go b/pkg/util/execdetails/util.go index 92a395b930..cbb1d7f949 100644 --- a/pkg/util/execdetails/util.go +++ b/pkg/util/execdetails/util.go @@ -15,8 +15,13 @@ package execdetails import ( + "cmp" "context" + "math" + "slices" + "time" + "github.com/influxdata/tdigest" "github.com/tikv/client-go/v2/util" ) @@ -45,3 +50,163 @@ func GetExecDetailsFromContext(ctx context.Context) (stmtDetail StmtExecDetails, return } + +type canGetFloat64 interface { + GetFloat64() float64 +} + +// Int64 is a wrapper of int64 to implement the canGetFloat64 interface. +type Int64 int64 + +// GetFloat64 implements the canGetFloat64 interface. +func (i Int64) GetFloat64() float64 { return float64(i) } + +// Duration is a wrapper of time.Duration to implement the canGetFloat64 interface. +type Duration time.Duration + +// GetFloat64 implements the canGetFloat64 interface. +func (d Duration) GetFloat64() float64 { return float64(d) } + +// DurationWithAddr is a wrapper of time.Duration and string to implement the canGetFloat64 interface. +type DurationWithAddr struct { + D time.Duration + Addr string +} + +// GetFloat64 implements the canGetFloat64 interface. +func (d DurationWithAddr) GetFloat64() float64 { return float64(d.D) } + +// Percentile is a struct to calculate the percentile of a series of values. +type Percentile[valueType canGetFloat64] struct { + values []valueType + size int + isSorted bool + + minVal valueType + maxVal valueType + sumVal float64 + dt *tdigest.TDigest +} + +// Add adds a value to calculate the percentile. +func (p *Percentile[valueType]) Add(value valueType) { + p.isSorted = false + p.sumVal += value.GetFloat64() + p.size++ + if p.dt == nil && len(p.values) == 0 { + p.minVal = value + p.maxVal = value + } else { + if value.GetFloat64() < p.minVal.GetFloat64() { + p.minVal = value + } + if value.GetFloat64() > p.maxVal.GetFloat64() { + p.maxVal = value + } + } + if p.dt == nil { + p.values = append(p.values, value) + if len(p.values) >= MaxDetailsNumsForOneQuery { + p.dt = tdigest.New() + for _, v := range p.values { + p.dt.Add(v.GetFloat64(), 1) + } + p.values = nil + } + return + } + p.dt.Add(value.GetFloat64(), 1) +} + +// GetPercentile returns the percentile `f` of the values. +func (p *Percentile[valueType]) GetPercentile(f float64) float64 { + if p.dt == nil { + if !p.isSorted { + p.isSorted = true + slices.SortFunc(p.values, func(i, j valueType) int { + return cmp.Compare(i.GetFloat64(), j.GetFloat64()) + }) + } + return p.values[int(float64(len(p.values))*f)].GetFloat64() + } + return p.dt.Quantile(f) +} + +// GetMax returns the max value. +func (p *Percentile[valueType]) GetMax() valueType { + return p.maxVal +} + +// GetMin returns the min value. +func (p *Percentile[valueType]) GetMin() valueType { + return p.minVal +} + +// MergePercentile merges two Percentile. +func (p *Percentile[valueType]) MergePercentile(p2 *Percentile[valueType]) { + p.isSorted = false + if p2.dt == nil { + for _, v := range p2.values { + p.Add(v) + } + return + } + p.sumVal += p2.sumVal + p.size += p2.size + if p.dt == nil { + p.dt = tdigest.New() + for _, v := range p.values { + p.dt.Add(v.GetFloat64(), 1) + } + p.values = nil + } + p.dt.AddCentroidList(p2.dt.Centroids()) +} + +// Size returns the size of the values. +func (p *Percentile[valueType]) Size() int { + return p.size +} + +// Sum returns the sum of the values. +func (p *Percentile[valueType]) Sum() float64 { + return p.sumVal +} + +// FormatDuration uses to format duration, this function will prune precision before format duration. +// Pruning precision is for human readability. The prune rule is: +// 1. if the duration was less than 1us, return the original string. +// 2. readable value >=10, keep 1 decimal, otherwise, keep 2 decimal. such as: +// 9.412345ms -> 9.41ms +// 10.412345ms -> 10.4ms +// 5.999s -> 6s +// 100.45µs -> 100.5µs +func FormatDuration(d time.Duration) string { + if d <= time.Microsecond { + return d.String() + } + unit := getUnit(d) + if unit == time.Nanosecond { + return d.String() + } + integer := (d / unit) * unit //nolint:durationcheck + decimal := float64(d%unit) / float64(unit) + if d < 10*unit { + decimal = math.Round(decimal*100) / 100 + } else { + decimal = math.Round(decimal*10) / 10 + } + d = integer + time.Duration(decimal*float64(unit)) + return d.String() +} + +func getUnit(d time.Duration) time.Duration { + if d >= time.Second { + return time.Second + } else if d >= time.Millisecond { + return time.Millisecond + } else if d >= time.Microsecond { + return time.Microsecond + } + return time.Nanosecond +}