store/tikv: remove execdetails dependency (#24119)
This commit is contained in:
@ -1176,7 +1176,7 @@ func (e *InsertRuntimeStat) Clone() execdetails.RuntimeStats {
|
||||
}
|
||||
if e.SnapshotRuntimeStats != nil {
|
||||
snapshotStats := e.SnapshotRuntimeStats.Clone()
|
||||
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
|
||||
newRs.SnapshotRuntimeStats = snapshotStats
|
||||
}
|
||||
if e.BasicRuntimeStats != nil {
|
||||
basicStats := e.BasicRuntimeStats.Clone()
|
||||
@ -1194,7 +1194,7 @@ func (e *InsertRuntimeStat) Merge(other execdetails.RuntimeStats) {
|
||||
if tmp.SnapshotRuntimeStats != nil {
|
||||
if e.SnapshotRuntimeStats == nil {
|
||||
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
|
||||
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
|
||||
e.SnapshotRuntimeStats = snapshotStats
|
||||
} else {
|
||||
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
|
||||
}
|
||||
|
||||
@ -580,7 +580,7 @@ func (e *runtimeStatsWithSnapshot) Clone() execdetails.RuntimeStats {
|
||||
newRs := &runtimeStatsWithSnapshot{}
|
||||
if e.SnapshotRuntimeStats != nil {
|
||||
snapshotStats := e.SnapshotRuntimeStats.Clone()
|
||||
newRs.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
|
||||
newRs.SnapshotRuntimeStats = snapshotStats
|
||||
}
|
||||
return newRs
|
||||
}
|
||||
@ -594,7 +594,7 @@ func (e *runtimeStatsWithSnapshot) Merge(other execdetails.RuntimeStats) {
|
||||
if tmp.SnapshotRuntimeStats != nil {
|
||||
if e.SnapshotRuntimeStats == nil {
|
||||
snapshotStats := tmp.SnapshotRuntimeStats.Clone()
|
||||
e.SnapshotRuntimeStats = snapshotStats.(*tikv.SnapshotRuntimeStats)
|
||||
e.SnapshotRuntimeStats = snapshotStats
|
||||
return
|
||||
}
|
||||
e.SnapshotRuntimeStats.Merge(tmp.SnapshotRuntimeStats)
|
||||
|
||||
@ -542,19 +542,19 @@ func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails, c
|
||||
}
|
||||
|
||||
// MergeScanDetail merges scan details into self.
|
||||
func (sc *StatementContext) MergeScanDetail(scanDetail *execdetails.ScanDetail) {
|
||||
func (sc *StatementContext) MergeScanDetail(scanDetail *util.ScanDetail) {
|
||||
// Currently TiFlash cop task does not fill scanDetail, so need to skip it if scanDetail is nil
|
||||
if scanDetail == nil {
|
||||
return
|
||||
}
|
||||
if sc.mu.execDetails.ScanDetail == nil {
|
||||
sc.mu.execDetails.ScanDetail = &execdetails.ScanDetail{}
|
||||
sc.mu.execDetails.ScanDetail = &util.ScanDetail{}
|
||||
}
|
||||
sc.mu.execDetails.ScanDetail.Merge(scanDetail)
|
||||
}
|
||||
|
||||
// MergeTimeDetail merges time details into self.
|
||||
func (sc *StatementContext) MergeTimeDetail(timeDetail execdetails.TimeDetail) {
|
||||
func (sc *StatementContext) MergeTimeDetail(timeDetail util.TimeDetail) {
|
||||
sc.mu.execDetails.TimeDetail.ProcessTime += timeDetail.ProcessTime
|
||||
sc.mu.execDetails.TimeDetail.WaitTime += timeDetail.WaitTime
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/util/execdetails"
|
||||
)
|
||||
|
||||
@ -39,7 +40,7 @@ func (s *stmtctxSuit) TestCopTasksDetails(c *C) {
|
||||
CalleeAddress: fmt.Sprintf("%v", i+1),
|
||||
BackoffSleep: make(map[string]time.Duration),
|
||||
BackoffTimes: make(map[string]int),
|
||||
TimeDetail: execdetails.TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: time.Second * time.Duration(i+1),
|
||||
WaitTime: time.Millisecond * time.Duration(i+1),
|
||||
},
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/execdetails"
|
||||
"github.com/pingcap/tidb/util/mock"
|
||||
@ -152,11 +153,11 @@ func (*testSessionSuite) TestSlowLogFormat(c *C) {
|
||||
execDetail := execdetails.ExecDetails{
|
||||
BackoffTime: time.Millisecond,
|
||||
RequestCount: 2,
|
||||
ScanDetail: &execdetails.ScanDetail{
|
||||
ScanDetail: &util.ScanDetail{
|
||||
ProcessedKeys: 20001,
|
||||
TotalKeys: 10000,
|
||||
},
|
||||
TimeDetail: execdetails.TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: time.Second * time.Duration(2),
|
||||
WaitTime: time.Minute,
|
||||
},
|
||||
|
||||
@ -912,8 +912,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *tikv.Backoffer, rpcCtx *t
|
||||
resp.detail.CalleeAddress = rpcCtx.Addr
|
||||
}
|
||||
resp.respTime = costTime
|
||||
sd := &execdetails.ScanDetail{}
|
||||
td := execdetails.TimeDetail{}
|
||||
sd := &util.ScanDetail{}
|
||||
td := util.TimeDetail{}
|
||||
if pbDetails := resp.pbResp.ExecDetailsV2; pbDetails != nil {
|
||||
// Take values in `ExecDetailsV2` first.
|
||||
if timeDetail := pbDetails.TimeDetail; timeDetail != nil {
|
||||
|
||||
@ -36,7 +36,6 @@ import (
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/store/tikv/unionstore"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/util/execdetails"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -525,10 +524,10 @@ func (s *KVSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
|
||||
return
|
||||
}
|
||||
if s.mu.stats.scanDetail == nil {
|
||||
s.mu.stats.scanDetail = &execdetails.ScanDetail{}
|
||||
s.mu.stats.scanDetail = &util.ScanDetail{}
|
||||
}
|
||||
if s.mu.stats.timeDetail == nil {
|
||||
s.mu.stats.timeDetail = &execdetails.TimeDetail{}
|
||||
s.mu.stats.timeDetail = &util.TimeDetail{}
|
||||
}
|
||||
s.mu.stats.scanDetail.MergeFromScanDetailV2(detail.ScanDetailV2)
|
||||
s.mu.stats.timeDetail.MergeFromTimeDetail(detail.TimeDetail)
|
||||
@ -706,17 +705,12 @@ type SnapshotRuntimeStats struct {
|
||||
rpcStats RegionRequestRuntimeStats
|
||||
backoffSleepMS map[BackoffType]int
|
||||
backoffTimes map[BackoffType]int
|
||||
scanDetail *execdetails.ScanDetail
|
||||
timeDetail *execdetails.TimeDetail
|
||||
}
|
||||
|
||||
// Tp implements the RuntimeStats interface.
|
||||
func (rs *SnapshotRuntimeStats) Tp() int {
|
||||
return execdetails.TpSnapshotRuntimeStats
|
||||
scanDetail *util.ScanDetail
|
||||
timeDetail *util.TimeDetail
|
||||
}
|
||||
|
||||
// Clone implements the RuntimeStats interface.
|
||||
func (rs *SnapshotRuntimeStats) Clone() execdetails.RuntimeStats {
|
||||
func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats {
|
||||
newRs := SnapshotRuntimeStats{rpcStats: NewRegionRequestRuntimeStats()}
|
||||
if rs.rpcStats.Stats != nil {
|
||||
for k, v := range rs.rpcStats.Stats {
|
||||
@ -737,28 +731,24 @@ func (rs *SnapshotRuntimeStats) Clone() execdetails.RuntimeStats {
|
||||
}
|
||||
|
||||
// Merge implements the RuntimeStats interface.
|
||||
func (rs *SnapshotRuntimeStats) Merge(other execdetails.RuntimeStats) {
|
||||
tmp, ok := other.(*SnapshotRuntimeStats)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if tmp.rpcStats.Stats != nil {
|
||||
func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) {
|
||||
if other.rpcStats.Stats != nil {
|
||||
if rs.rpcStats.Stats == nil {
|
||||
rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats, len(tmp.rpcStats.Stats))
|
||||
rs.rpcStats.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats, len(other.rpcStats.Stats))
|
||||
}
|
||||
rs.rpcStats.Merge(tmp.rpcStats)
|
||||
rs.rpcStats.Merge(other.rpcStats)
|
||||
}
|
||||
if len(tmp.backoffSleepMS) > 0 {
|
||||
if len(other.backoffSleepMS) > 0 {
|
||||
if rs.backoffSleepMS == nil {
|
||||
rs.backoffSleepMS = make(map[BackoffType]int)
|
||||
}
|
||||
if rs.backoffTimes == nil {
|
||||
rs.backoffTimes = make(map[BackoffType]int)
|
||||
}
|
||||
for k, v := range tmp.backoffSleepMS {
|
||||
for k, v := range other.backoffSleepMS {
|
||||
rs.backoffSleepMS[k] += v
|
||||
}
|
||||
for k, v := range tmp.backoffTimes {
|
||||
for k, v := range other.backoffTimes {
|
||||
rs.backoffTimes[k] += v
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,10 +14,16 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
)
|
||||
|
||||
type commitDetailCtxKeyType struct{}
|
||||
@ -179,3 +185,119 @@ func getUnit(d time.Duration) time.Duration {
|
||||
}
|
||||
return time.Nanosecond
|
||||
}
|
||||
|
||||
// ScanDetail contains coprocessor scan detail information.
|
||||
type ScanDetail struct {
|
||||
// TotalKeys is the approximate number of MVCC keys meet during scanning. It includes
|
||||
// deleted versions, but does not include RocksDB tombstone keys.
|
||||
TotalKeys int64
|
||||
// ProcessedKeys is the number of user keys scanned from the storage.
|
||||
// It does not include deleted version or RocksDB tombstone keys.
|
||||
// For Coprocessor requests, it includes keys that has been filtered out by Selection.
|
||||
ProcessedKeys int64
|
||||
// RocksdbDeleteSkippedCount is the total number of deletes and single deletes skipped over during
|
||||
// iteration, i.e. how many RocksDB tombstones are skipped.
|
||||
RocksdbDeleteSkippedCount uint64
|
||||
// RocksdbKeySkippedCount it the total number of internal keys skipped over during iteration.
|
||||
RocksdbKeySkippedCount uint64
|
||||
// RocksdbBlockCacheHitCount is the total number of RocksDB block cache hits.
|
||||
RocksdbBlockCacheHitCount uint64
|
||||
// RocksdbBlockReadCount is the total number of block reads (with IO).
|
||||
RocksdbBlockReadCount uint64
|
||||
// RocksdbBlockReadByte is the total number of bytes from block reads.
|
||||
RocksdbBlockReadByte uint64
|
||||
}
|
||||
|
||||
// Merge merges scan detail execution details into self.
|
||||
func (sd *ScanDetail) Merge(scanDetail *ScanDetail) {
|
||||
atomic.AddInt64(&sd.TotalKeys, scanDetail.TotalKeys)
|
||||
atomic.AddInt64(&sd.ProcessedKeys, scanDetail.ProcessedKeys)
|
||||
atomic.AddUint64(&sd.RocksdbDeleteSkippedCount, scanDetail.RocksdbDeleteSkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbKeySkippedCount, scanDetail.RocksdbKeySkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadCount, scanDetail.RocksdbBlockReadCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadByte, scanDetail.RocksdbBlockReadByte)
|
||||
}
|
||||
|
||||
var zeroScanDetail = ScanDetail{}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
func (sd *ScanDetail) String() string {
|
||||
if sd == nil || *sd == zeroScanDetail {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
buf.WriteString("scan_detail: {")
|
||||
buf.WriteString("total_process_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10))
|
||||
buf.WriteString(", total_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10))
|
||||
buf.WriteString(", rocksdb: {")
|
||||
buf.WriteString("delete_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10))
|
||||
buf.WriteString(", key_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10))
|
||||
buf.WriteString(", block: {")
|
||||
buf.WriteString("cache_hit_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10))
|
||||
buf.WriteString(", read_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10))
|
||||
buf.WriteString(", read_byte: ")
|
||||
buf.WriteString(memory.FormatBytes(int64(sd.RocksdbBlockReadByte)))
|
||||
buf.WriteString("}}}")
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// MergeFromScanDetailV2 merges scan detail from pb into itself.
|
||||
func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) {
|
||||
if scanDetail != nil {
|
||||
sd.TotalKeys += int64(scanDetail.TotalVersions)
|
||||
sd.ProcessedKeys += int64(scanDetail.ProcessedVersions)
|
||||
sd.RocksdbDeleteSkippedCount += scanDetail.RocksdbDeleteSkippedCount
|
||||
sd.RocksdbKeySkippedCount += scanDetail.RocksdbKeySkippedCount
|
||||
sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount
|
||||
sd.RocksdbBlockReadCount += scanDetail.RocksdbBlockReadCount
|
||||
sd.RocksdbBlockReadByte += scanDetail.RocksdbBlockReadByte
|
||||
}
|
||||
}
|
||||
|
||||
// TimeDetail contains coprocessor time detail information.
|
||||
type TimeDetail struct {
|
||||
// WaitWallTimeMs is the off-cpu wall time which is elapsed in TiKV side. Usually this includes queue waiting time and
|
||||
// other kind of waitings in series.
|
||||
ProcessTime time.Duration
|
||||
// Off-cpu and on-cpu wall time elapsed to actually process the request payload. It does not
|
||||
// include `wait_wall_time`.
|
||||
// This field is very close to the CPU time in most cases. Some wait time spend in RocksDB
|
||||
// cannot be excluded for now, like Mutex wait time, which is included in this field, so that
|
||||
// this field is called wall time instead of CPU time.
|
||||
WaitTime time.Duration
|
||||
}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
func (td *TimeDetail) String() string {
|
||||
if td == nil {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
if td.ProcessTime > 0 {
|
||||
buf.WriteString("total_process_time: ")
|
||||
buf.WriteString(FormatDuration(td.ProcessTime))
|
||||
}
|
||||
if td.WaitTime > 0 {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("total_wait_time: ")
|
||||
buf.WriteString(FormatDuration(td.WaitTime))
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// MergeFromTimeDetail merges time detail from pb into itself.
|
||||
func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) {
|
||||
if timeDetail != nil {
|
||||
td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
|
||||
td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,9 +24,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -42,8 +40,8 @@ type ExecDetails struct {
|
||||
RequestCount int
|
||||
CommitDetail *util.CommitDetails
|
||||
LockKeysDetail *util.LockKeysDetails
|
||||
ScanDetail *ScanDetail
|
||||
TimeDetail TimeDetail
|
||||
ScanDetail *util.ScanDetail
|
||||
TimeDetail util.TimeDetail
|
||||
}
|
||||
|
||||
type stmtExecDetailKeyType struct{}
|
||||
@ -56,122 +54,6 @@ type StmtExecDetails struct {
|
||||
WriteSQLRespDuration time.Duration
|
||||
}
|
||||
|
||||
// TimeDetail contains coprocessor time detail information.
|
||||
type TimeDetail struct {
|
||||
// WaitWallTimeMs is the off-cpu wall time which is elapsed in TiKV side. Usually this includes queue waiting time and
|
||||
// other kind of waitings in series.
|
||||
ProcessTime time.Duration
|
||||
// Off-cpu and on-cpu wall time elapsed to actually process the request payload. It does not
|
||||
// include `wait_wall_time`.
|
||||
// This field is very close to the CPU time in most cases. Some wait time spend in RocksDB
|
||||
// cannot be excluded for now, like Mutex wait time, which is included in this field, so that
|
||||
// this field is called wall time instead of CPU time.
|
||||
WaitTime time.Duration
|
||||
}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
func (td *TimeDetail) String() string {
|
||||
if td == nil {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
if td.ProcessTime > 0 {
|
||||
buf.WriteString("total_process_time: ")
|
||||
buf.WriteString(FormatDuration(td.ProcessTime))
|
||||
}
|
||||
if td.WaitTime > 0 {
|
||||
if buf.Len() > 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("total_wait_time: ")
|
||||
buf.WriteString(FormatDuration(td.WaitTime))
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// MergeFromTimeDetail merges time detail from pb into itself.
|
||||
func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) {
|
||||
if timeDetail != nil {
|
||||
td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
|
||||
td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
|
||||
}
|
||||
}
|
||||
|
||||
// ScanDetail contains coprocessor scan detail information.
|
||||
type ScanDetail struct {
|
||||
// TotalKeys is the approximate number of MVCC keys meet during scanning. It includes
|
||||
// deleted versions, but does not include RocksDB tombstone keys.
|
||||
TotalKeys int64
|
||||
// ProcessedKeys is the number of user keys scanned from the storage.
|
||||
// It does not include deleted version or RocksDB tombstone keys.
|
||||
// For Coprocessor requests, it includes keys that has been filtered out by Selection.
|
||||
ProcessedKeys int64
|
||||
// RocksdbDeleteSkippedCount is the total number of deletes and single deletes skipped over during
|
||||
// iteration, i.e. how many RocksDB tombstones are skipped.
|
||||
RocksdbDeleteSkippedCount uint64
|
||||
// RocksdbKeySkippedCount it the total number of internal keys skipped over during iteration.
|
||||
RocksdbKeySkippedCount uint64
|
||||
// RocksdbBlockCacheHitCount is the total number of RocksDB block cache hits.
|
||||
RocksdbBlockCacheHitCount uint64
|
||||
// RocksdbBlockReadCount is the total number of block reads (with IO).
|
||||
RocksdbBlockReadCount uint64
|
||||
// RocksdbBlockReadByte is the total number of bytes from block reads.
|
||||
RocksdbBlockReadByte uint64
|
||||
}
|
||||
|
||||
// Merge merges scan detail execution details into self.
|
||||
func (sd *ScanDetail) Merge(scanDetail *ScanDetail) {
|
||||
atomic.AddInt64(&sd.TotalKeys, scanDetail.TotalKeys)
|
||||
atomic.AddInt64(&sd.ProcessedKeys, scanDetail.ProcessedKeys)
|
||||
atomic.AddUint64(&sd.RocksdbDeleteSkippedCount, scanDetail.RocksdbDeleteSkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbKeySkippedCount, scanDetail.RocksdbKeySkippedCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockCacheHitCount, scanDetail.RocksdbBlockCacheHitCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadCount, scanDetail.RocksdbBlockReadCount)
|
||||
atomic.AddUint64(&sd.RocksdbBlockReadByte, scanDetail.RocksdbBlockReadByte)
|
||||
}
|
||||
|
||||
var zeroScanDetail = ScanDetail{}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
func (sd *ScanDetail) String() string {
|
||||
if sd == nil || *sd == zeroScanDetail {
|
||||
return ""
|
||||
}
|
||||
buf := bytes.NewBuffer(make([]byte, 0, 16))
|
||||
buf.WriteString("scan_detail: {")
|
||||
buf.WriteString("total_process_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.ProcessedKeys, 10))
|
||||
buf.WriteString(", total_keys: ")
|
||||
buf.WriteString(strconv.FormatInt(sd.TotalKeys, 10))
|
||||
buf.WriteString(", rocksdb: {")
|
||||
buf.WriteString("delete_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbDeleteSkippedCount, 10))
|
||||
buf.WriteString(", key_skipped_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbKeySkippedCount, 10))
|
||||
buf.WriteString(", block: {")
|
||||
buf.WriteString("cache_hit_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockCacheHitCount, 10))
|
||||
buf.WriteString(", read_count: ")
|
||||
buf.WriteString(strconv.FormatUint(sd.RocksdbBlockReadCount, 10))
|
||||
buf.WriteString(", read_byte: ")
|
||||
buf.WriteString(memory.FormatBytes(int64(sd.RocksdbBlockReadByte)))
|
||||
buf.WriteString("}}}")
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// MergeFromScanDetailV2 merges scan detail from pb into itself.
|
||||
func (sd *ScanDetail) MergeFromScanDetailV2(scanDetail *kvrpcpb.ScanDetailV2) {
|
||||
if scanDetail != nil {
|
||||
sd.TotalKeys += int64(scanDetail.TotalVersions)
|
||||
sd.ProcessedKeys += int64(scanDetail.ProcessedVersions)
|
||||
sd.RocksdbDeleteSkippedCount += scanDetail.RocksdbDeleteSkippedCount
|
||||
sd.RocksdbKeySkippedCount += scanDetail.RocksdbKeySkippedCount
|
||||
sd.RocksdbBlockCacheHitCount += scanDetail.RocksdbBlockCacheHitCount
|
||||
sd.RocksdbBlockReadCount += scanDetail.RocksdbBlockReadCount
|
||||
sd.RocksdbBlockReadByte += scanDetail.RocksdbBlockReadByte
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// CopTimeStr represents the sum of cop-task time spend in TiDB distSQL.
|
||||
CopTimeStr = "Cop_time"
|
||||
@ -435,7 +317,7 @@ type CopRuntimeStats struct {
|
||||
// same tikv-server instance. We have to use a list to maintain all tasks
|
||||
// executed on each instance.
|
||||
stats map[string][]*basicCopRuntimeStats
|
||||
scanDetail *ScanDetail
|
||||
scanDetail *util.ScanDetail
|
||||
// do not use kv.StoreType because it will meet cycle import error
|
||||
storeType string
|
||||
}
|
||||
@ -739,7 +621,7 @@ func (e *RuntimeStatsColl) GetOrCreateCopStats(planID int, storeType string) *Co
|
||||
if !ok {
|
||||
copStats = &CopRuntimeStats{
|
||||
stats: make(map[string][]*basicCopRuntimeStats),
|
||||
scanDetail: &ScanDetail{},
|
||||
scanDetail: &util.ScanDetail{},
|
||||
storeType: storeType,
|
||||
}
|
||||
e.copStats[planID] = copStats
|
||||
@ -769,7 +651,7 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, storeType string, addres
|
||||
}
|
||||
|
||||
// RecordScanDetail records a specific cop tasks's cop detail.
|
||||
func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType string, detail *ScanDetail) {
|
||||
func (e *RuntimeStatsColl) RecordScanDetail(planID int, storeType string, detail *util.ScanDetail) {
|
||||
copStats := e.GetOrCreateCopStats(planID, storeType)
|
||||
copStats.scanDetail.Merge(detail)
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func TestString(t *testing.T) {
|
||||
PrewriteRegionNum: 1,
|
||||
TxnRetry: 1,
|
||||
},
|
||||
ScanDetail: &ScanDetail{
|
||||
ScanDetail: &util.ScanDetail{
|
||||
ProcessedKeys: 10,
|
||||
TotalKeys: 100,
|
||||
RocksdbDeleteSkippedCount: 1,
|
||||
@ -67,7 +67,7 @@ func TestString(t *testing.T) {
|
||||
RocksdbBlockReadCount: 1,
|
||||
RocksdbBlockReadByte: 100,
|
||||
},
|
||||
TimeDetail: TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: 2*time.Second + 5*time.Millisecond,
|
||||
WaitTime: time.Second,
|
||||
},
|
||||
@ -103,7 +103,7 @@ func TestCopRuntimeStats(t *testing.T) {
|
||||
stats.RecordOneCopTask(tableScanID, "tikv", "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2))
|
||||
stats.RecordOneCopTask(aggID, "tikv", "8.8.8.8", mockExecutorExecutionSummary(3, 3, 3))
|
||||
stats.RecordOneCopTask(aggID, "tikv", "8.8.8.9", mockExecutorExecutionSummary(4, 4, 4))
|
||||
scanDetail := &ScanDetail{
|
||||
scanDetail := &util.ScanDetail{
|
||||
TotalKeys: 15,
|
||||
ProcessedKeys: 10,
|
||||
RocksdbDeleteSkippedCount: 5,
|
||||
@ -151,7 +151,7 @@ func TestCopRuntimeStats(t *testing.T) {
|
||||
t.Fatalf(cop.String())
|
||||
}
|
||||
|
||||
zeroScanDetail := ScanDetail{}
|
||||
zeroScanDetail := util.ScanDetail{}
|
||||
if zeroScanDetail.String() != "" {
|
||||
t.Fatalf(zeroScanDetail.String())
|
||||
}
|
||||
@ -166,7 +166,7 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) {
|
||||
stats.RecordOneCopTask(aggID, "tiflash", "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(2, 2, 2, 1, "tablescan_"+strconv.Itoa(tableScanID)))
|
||||
stats.RecordOneCopTask(tableScanID, "tiflash", "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(3, 3, 3, 1, "aggregation_"+strconv.Itoa(aggID)))
|
||||
stats.RecordOneCopTask(tableScanID, "tiflash", "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(4, 4, 4, 1, "aggregation_"+strconv.Itoa(aggID)))
|
||||
scanDetail := &ScanDetail{
|
||||
scanDetail := &util.ScanDetail{
|
||||
TotalKeys: 10,
|
||||
ProcessedKeys: 10,
|
||||
RocksdbDeleteSkippedCount: 10,
|
||||
|
||||
@ -200,7 +200,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) {
|
||||
PrewriteRegionNum: 100,
|
||||
TxnRetry: 10,
|
||||
},
|
||||
ScanDetail: &execdetails.ScanDetail{
|
||||
ScanDetail: &util.ScanDetail{
|
||||
TotalKeys: 6000,
|
||||
ProcessedKeys: 1500,
|
||||
RocksdbDeleteSkippedCount: 100,
|
||||
@ -209,7 +209,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) {
|
||||
RocksdbBlockReadCount: 10,
|
||||
RocksdbBlockReadByte: 1000,
|
||||
},
|
||||
TimeDetail: execdetails.TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: 1500,
|
||||
WaitTime: 150,
|
||||
},
|
||||
@ -327,7 +327,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) {
|
||||
PrewriteRegionNum: 10,
|
||||
TxnRetry: 1,
|
||||
},
|
||||
ScanDetail: &execdetails.ScanDetail{
|
||||
ScanDetail: &util.ScanDetail{
|
||||
TotalKeys: 600,
|
||||
ProcessedKeys: 150,
|
||||
RocksdbDeleteSkippedCount: 100,
|
||||
@ -336,7 +336,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) {
|
||||
RocksdbBlockReadCount: 10,
|
||||
RocksdbBlockReadByte: 1000,
|
||||
},
|
||||
TimeDetail: execdetails.TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: 150,
|
||||
WaitTime: 15,
|
||||
},
|
||||
@ -583,7 +583,7 @@ func generateAnyExecInfo() *StmtExecInfo {
|
||||
PrewriteRegionNum: 20,
|
||||
TxnRetry: 2,
|
||||
},
|
||||
ScanDetail: &execdetails.ScanDetail{
|
||||
ScanDetail: &util.ScanDetail{
|
||||
TotalKeys: 1000,
|
||||
ProcessedKeys: 500,
|
||||
RocksdbDeleteSkippedCount: 100,
|
||||
@ -592,7 +592,7 @@ func generateAnyExecInfo() *StmtExecInfo {
|
||||
RocksdbBlockReadCount: 10,
|
||||
RocksdbBlockReadByte: 1000,
|
||||
},
|
||||
TimeDetail: execdetails.TimeDetail{
|
||||
TimeDetail: util.TimeDetail{
|
||||
ProcessTime: 500,
|
||||
WaitTime: 50,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user