distsql, util: adapt scandetailv2 in coprocessor response (#20492)
This commit is contained in:
@ -281,6 +281,10 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
|
||||
}
|
||||
r.stats.mergeCopRuntimeStats(copStats, respTime)
|
||||
|
||||
if copStats.CopDetail != nil && len(r.copPlanIDs) > 0 {
|
||||
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordCopDetail(r.copPlanIDs[len(r.copPlanIDs)-1], copStats.CopDetail)
|
||||
}
|
||||
|
||||
for i, detail := range r.selectResp.GetExecutionSummaries() {
|
||||
if detail != nil && detail.TimeProcessedNs != nil &&
|
||||
detail.NumProducedRows != nil && detail.NumIterations != nil {
|
||||
|
||||
@ -411,7 +411,8 @@ type CopRuntimeStats struct {
|
||||
// have many region leaders, several coprocessor tasks can be sent to the
|
||||
// same tikv-server instance. We have to use a list to maintain all tasks
|
||||
// executed on each instance.
|
||||
stats map[string][]*BasicRuntimeStats
|
||||
stats map[string][]*BasicRuntimeStats
|
||||
copDetails *CopDetails
|
||||
}
|
||||
|
||||
// RecordOneCopTask records a specific cop tasks's execution detail.
|
||||
@ -450,14 +451,23 @@ func (crs *CopRuntimeStats) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
var result string
|
||||
if totalTasks == 1 {
|
||||
return fmt.Sprintf("tikv_task:{time:%v, loops:%d}", procTimes[0], totalIters)
|
||||
result += fmt.Sprintf("tikv_task:{time:%v, loops:%d}", procTimes[0], totalIters)
|
||||
} else {
|
||||
n := len(procTimes)
|
||||
sort.Slice(procTimes, func(i, j int) bool { return procTimes[i] < procTimes[j] })
|
||||
result += fmt.Sprintf("tikv_task:{proc max:%v, min:%v, p80:%v, p95:%v, iters:%v, tasks:%v}",
|
||||
procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks)
|
||||
}
|
||||
|
||||
n := len(procTimes)
|
||||
sort.Slice(procTimes, func(i, j int) bool { return procTimes[i] < procTimes[j] })
|
||||
return fmt.Sprintf("tikv_task:{proc max:%v, min:%v, p80:%v, p95:%v, iters:%v, tasks:%v}",
|
||||
procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalIters, totalTasks)
|
||||
if crs.copDetails != nil {
|
||||
result += fmt.Sprintf(", total_keys:%v, processed_keys:%v, rocksdb:{delete_skipped_count:%v, "+
|
||||
"key_skipped_count:%v, block_cache_hit_count:%v, block_read_count:%v, block_read_byte:%v}",
|
||||
crs.copDetails.TotalKeys, crs.copDetails.ProcessedKeys,
|
||||
crs.copDetails.RocksdbDeleteSkippedCount, crs.copDetails.RocksdbKeySkippedCount, crs.copDetails.RocksdbBlockCacheHitCount,
|
||||
crs.copDetails.RocksdbBlockReadCount, crs.copDetails.RocksdbBlockReadByte)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const (
|
||||
@ -696,6 +706,15 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID int, address string, summary
|
||||
copStats.RecordOneCopTask(address, summary)
|
||||
}
|
||||
|
||||
// RecordCopDetail records a specific cop tasks's cop detail.
|
||||
func (e *RuntimeStatsColl) RecordCopDetail(planID int, detail *CopDetails) {
|
||||
copStats := e.GetCopStats(planID)
|
||||
if copStats.copDetails == nil {
|
||||
copStats.copDetails = &CopDetails{}
|
||||
}
|
||||
copStats.copDetails.Merge(detail)
|
||||
}
|
||||
|
||||
// ExistsRootStats checks if the planID exists in the rootStats collection.
|
||||
func (e *RuntimeStatsColl) ExistsRootStats(planID int) bool {
|
||||
e.mu.Lock()
|
||||
|
||||
@ -100,11 +100,22 @@ func TestCopRuntimeStats(t *testing.T) {
|
||||
stats.RecordOneCopTask(tableScanID, "8.8.8.9", mockExecutorExecutionSummary(2, 2, 2))
|
||||
stats.RecordOneCopTask(aggID, "8.8.8.8", mockExecutorExecutionSummary(3, 3, 3))
|
||||
stats.RecordOneCopTask(aggID, "8.8.8.9", mockExecutorExecutionSummary(4, 4, 4))
|
||||
copDetails := &CopDetails{
|
||||
TotalKeys: 10,
|
||||
ProcessedKeys: 10,
|
||||
RocksdbDeleteSkippedCount: 10,
|
||||
RocksdbKeySkippedCount: 1,
|
||||
RocksdbBlockCacheHitCount: 10,
|
||||
RocksdbBlockReadCount: 10,
|
||||
RocksdbBlockReadByte: 100,
|
||||
}
|
||||
stats.RecordCopDetail(tableScanID, copDetails)
|
||||
if stats.ExistsCopStats(tableScanID) != true {
|
||||
t.Fatal("exist")
|
||||
}
|
||||
cop := stats.GetCopStats(tableScanID)
|
||||
if cop.String() != "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}" {
|
||||
if cop.String() != "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}"+
|
||||
", total_keys:10, processed_keys:10, rocksdb:{delete_skipped_count:10, key_skipped_count:1, block_cache_hit_count:10, block_read_count:10, block_read_byte:100}" {
|
||||
t.Fatal("table_scan")
|
||||
}
|
||||
copStats := cop.stats["8.8.8.8"]
|
||||
@ -138,11 +149,22 @@ func TestCopRuntimeStatsForTiFlash(t *testing.T) {
|
||||
stats.RecordOneCopTask(aggID, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(2, 2, 2, "tablescan_"+strconv.Itoa(tableScanID)))
|
||||
stats.RecordOneCopTask(tableScanID, "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(3, 3, 3, "aggregation_"+strconv.Itoa(aggID)))
|
||||
stats.RecordOneCopTask(tableScanID, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(4, 4, 4, "aggregation_"+strconv.Itoa(aggID)))
|
||||
copDetails := &CopDetails{
|
||||
TotalKeys: 10,
|
||||
ProcessedKeys: 10,
|
||||
RocksdbDeleteSkippedCount: 10,
|
||||
RocksdbKeySkippedCount: 1,
|
||||
RocksdbBlockCacheHitCount: 10,
|
||||
RocksdbBlockReadCount: 10,
|
||||
RocksdbBlockReadByte: 100,
|
||||
}
|
||||
stats.RecordCopDetail(tableScanID, copDetails)
|
||||
if stats.ExistsCopStats(tableScanID) != true {
|
||||
t.Fatal("exist")
|
||||
}
|
||||
cop := stats.GetCopStats(tableScanID)
|
||||
if cop.String() != "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}" {
|
||||
if cop.String() != "tikv_task:{proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2}"+
|
||||
", total_keys:10, processed_keys:10, rocksdb:{delete_skipped_count:10, key_skipped_count:1, block_cache_hit_count:10, block_read_count:10, block_read_byte:100}" {
|
||||
t.Fatal("table_scan")
|
||||
}
|
||||
copStats := cop.stats["8.8.8.8"]
|
||||
|
||||
Reference in New Issue
Block a user