topsql: remove do topN before report (#31192)
This commit is contained in:
@ -496,29 +496,15 @@ func (c *collecting) appendOthersStmtStatsItem(timestamp uint64, item stmtstats.
|
||||
others.appendStmtStatsItem(timestamp, item)
|
||||
}
|
||||
|
||||
// compactToTopNAndOthers returns the largest N records, other records will be packed and appended to the end.
|
||||
func (c *collecting) compactToTopNAndOthers(n int) records {
|
||||
// getReportRecords returns all records, others record will be packed and appended to the end.
|
||||
func (c *collecting) getReportRecords() records {
|
||||
others := c.records[keyOthers]
|
||||
delete(c.records, keyOthers)
|
||||
rs := make(records, 0, len(c.records))
|
||||
for _, v := range c.records {
|
||||
rs = append(rs, *v)
|
||||
}
|
||||
// Fetch TopN records.
|
||||
var evicted records
|
||||
rs, evicted = rs.topN(n)
|
||||
if others != nil {
|
||||
// Sort the records by timestamp to fix the affect of time jump backward.
|
||||
sort.Sort(others)
|
||||
} else {
|
||||
others = newRecord(nil, nil)
|
||||
}
|
||||
for _, evict := range evicted {
|
||||
e := evict // Avoid implicit memory aliasing in for loop.
|
||||
others.merge(&e)
|
||||
}
|
||||
if others.totalCPUTimeMs > 0 {
|
||||
// append others which summarize all evicted item's cpu-time.
|
||||
if others != nil && others.totalCPUTimeMs > 0 {
|
||||
rs = append(rs, *others)
|
||||
}
|
||||
return rs
|
||||
|
||||
@ -273,19 +273,16 @@ func Test_collecting_appendOthers(t *testing.T) {
|
||||
assert.Equal(t, uint64(2000), r.tsItems[1].stmtStats.SumDurationNs)
|
||||
}
|
||||
|
||||
func Test_collecting_compactToTopNAndOthers(t *testing.T) {
|
||||
func Test_collecting_getReportRecords(t *testing.T) {
|
||||
c := newCollecting()
|
||||
c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")).appendCPUTime(1, 1)
|
||||
c.getOrCreateRecord([]byte("SQL-2"), []byte("PLAN-2")).appendCPUTime(1, 2)
|
||||
c.getOrCreateRecord([]byte("SQL-3"), []byte("PLAN-3")).appendCPUTime(1, 3)
|
||||
rs := c.compactToTopNAndOthers(1)
|
||||
assert.Len(t, rs, 2)
|
||||
assert.Equal(t, []byte("SQL-3"), rs[0].sqlDigest)
|
||||
assert.Equal(t, []byte("PLAN-3"), rs[0].planDigest)
|
||||
assert.Equal(t, uint64(3), rs[0].totalCPUTimeMs)
|
||||
assert.Len(t, rs[0].tsItems, 1)
|
||||
assert.Equal(t, uint32(3), rs[0].tsItems[0].cpuTimeMs)
|
||||
assert.Equal(t, uint64(3), rs[1].totalCPUTimeMs) // 1 + 2 = 3
|
||||
c.getOrCreateRecord([]byte(keyOthers), []byte(keyOthers)).appendCPUTime(1, 10)
|
||||
rs := c.getReportRecords()
|
||||
assert.Len(t, rs, 4)
|
||||
assert.Equal(t, uint32(10), rs[3].tsItems[0].cpuTimeMs)
|
||||
assert.Equal(t, uint64(10), rs[3].totalCPUTimeMs)
|
||||
}
|
||||
|
||||
func Test_collecting_take(t *testing.T) {
|
||||
|
||||
@ -43,7 +43,7 @@ type DataSinkRegisterer interface {
|
||||
|
||||
// ReportData contains data that reporter sends to the agent.
|
||||
type ReportData struct {
|
||||
// DataRecords contains the compactToTopNAndOthers records []tipb.TopSQLRecord and the `others`
|
||||
// DataRecords contains the topN records of each second and the `others`
|
||||
// record which aggregation all []tipb.TopSQLRecord that is out of Top N.
|
||||
DataRecords []tipb.TopSQLRecord
|
||||
SQLMetas []tipb.SQLMeta
|
||||
|
||||
@ -277,8 +277,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
|
||||
// that `data` contains. So we wait for a little while to ensure that writes
|
||||
// are finished.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
// Get top N records from records.
|
||||
rs := data.collected.compactToTopNAndOthers(int(topsqlstate.GlobalState.MaxStatementCount.Load()))
|
||||
rs := data.collected.getReportRecords()
|
||||
// Convert to protobuf data and do report.
|
||||
tsr.doReport(&ReportData{
|
||||
DataRecords: rs.toProto(),
|
||||
|
||||
@ -224,32 +224,52 @@ func TestCollectAndTopN(t *testing.T) {
|
||||
records := []collector.SQLCPUTimeRecord{
|
||||
newSQLCPUTimeRecord(tsr, 1, 1),
|
||||
newSQLCPUTimeRecord(tsr, 2, 2),
|
||||
newSQLCPUTimeRecord(tsr, 3, 3),
|
||||
}
|
||||
// SQL-2: 2ms
|
||||
// SQL-3: 3ms
|
||||
// Others: 1ms
|
||||
collectAndWait(tsr, 1, records)
|
||||
|
||||
records = []collector.SQLCPUTimeRecord{
|
||||
newSQLCPUTimeRecord(tsr, 3, 3),
|
||||
newSQLCPUTimeRecord(tsr, 1, 1),
|
||||
newSQLCPUTimeRecord(tsr, 3, 3),
|
||||
}
|
||||
// SQL-1: 1ms
|
||||
// SQL-3: 3ms
|
||||
collectAndWait(tsr, 2, records)
|
||||
|
||||
records = []collector.SQLCPUTimeRecord{
|
||||
newSQLCPUTimeRecord(tsr, 4, 1),
|
||||
newSQLCPUTimeRecord(tsr, 1, 1),
|
||||
newSQLCPUTimeRecord(tsr, 4, 4),
|
||||
newSQLCPUTimeRecord(tsr, 1, 10),
|
||||
newSQLCPUTimeRecord(tsr, 3, 1),
|
||||
}
|
||||
// SQL-1: 10ms
|
||||
// SQL-4: 4ms
|
||||
// Others: 1ms
|
||||
collectAndWait(tsr, 3, records)
|
||||
|
||||
records = []collector.SQLCPUTimeRecord{
|
||||
newSQLCPUTimeRecord(tsr, 5, 1),
|
||||
newSQLCPUTimeRecord(tsr, 1, 1),
|
||||
newSQLCPUTimeRecord(tsr, 5, 5),
|
||||
newSQLCPUTimeRecord(tsr, 4, 4),
|
||||
newSQLCPUTimeRecord(tsr, 1, 10),
|
||||
newSQLCPUTimeRecord(tsr, 2, 20),
|
||||
}
|
||||
// SQL-2: 20ms
|
||||
// SQL-1: 1ms
|
||||
// Others: 9ms
|
||||
collectAndWait(tsr, 4, records)
|
||||
|
||||
// Test for time jump back.
|
||||
records = []collector.SQLCPUTimeRecord{
|
||||
newSQLCPUTimeRecord(tsr, 6, 1),
|
||||
newSQLCPUTimeRecord(tsr, 6, 6),
|
||||
newSQLCPUTimeRecord(tsr, 1, 1),
|
||||
newSQLCPUTimeRecord(tsr, 2, 2),
|
||||
newSQLCPUTimeRecord(tsr, 3, 3),
|
||||
}
|
||||
// SQL-6: 6ms
|
||||
// SQL-3: 3ms
|
||||
// Others: 3ms
|
||||
collectAndWait(tsr, 0, records)
|
||||
|
||||
// Wait agent server collect finish.
|
||||
@ -257,7 +277,14 @@ func TestCollectAndTopN(t *testing.T) {
|
||||
|
||||
// check for equality of server received batch and the original data
|
||||
results := agentServer.GetLatestRecords()
|
||||
require.Len(t, results, 3)
|
||||
// Digest total
|
||||
// "": 14ms (others)
|
||||
// SQL-1: 21ms
|
||||
// SQL-2: 22ms
|
||||
// SQL-3: 9ms
|
||||
// SQL-4: 4ms
|
||||
// SQL-6: 6ms
|
||||
require.Len(t, results, 6)
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return string(results[i].SqlDigest) < string(results[j].SqlDigest)
|
||||
})
|
||||
@ -269,19 +296,26 @@ func TestCollectAndTopN(t *testing.T) {
|
||||
return int(total)
|
||||
}
|
||||
require.Nil(t, results[0].SqlDigest)
|
||||
require.Equal(t, 5, getTotalCPUTime(results[0]))
|
||||
require.Equal(t, uint64(0), results[0].Items[0].TimestampSec)
|
||||
require.Equal(t, uint64(1), results[0].Items[1].TimestampSec)
|
||||
require.Equal(t, uint64(3), results[0].Items[2].TimestampSec)
|
||||
require.Equal(t, uint64(4), results[0].Items[3].TimestampSec)
|
||||
require.Equal(t, []byte(nil), results[0].SqlDigest)
|
||||
require.Equal(t, 14, getTotalCPUTime(results[0]))
|
||||
require.Equal(t, uint64(1), results[0].Items[0].TimestampSec)
|
||||
require.Equal(t, uint64(3), results[0].Items[1].TimestampSec)
|
||||
require.Equal(t, uint64(4), results[0].Items[2].TimestampSec)
|
||||
require.Equal(t, uint64(0), results[0].Items[3].TimestampSec)
|
||||
require.Equal(t, uint32(1), results[0].Items[0].CpuTimeMs)
|
||||
require.Equal(t, uint32(2), results[0].Items[1].CpuTimeMs)
|
||||
require.Equal(t, uint32(1), results[0].Items[2].CpuTimeMs)
|
||||
require.Equal(t, uint32(1), results[0].Items[3].CpuTimeMs)
|
||||
require.Equal(t, uint32(1), results[0].Items[1].CpuTimeMs)
|
||||
require.Equal(t, uint32(9), results[0].Items[2].CpuTimeMs)
|
||||
require.Equal(t, uint32(3), results[0].Items[3].CpuTimeMs)
|
||||
require.Equal(t, []byte("sqlDigest1"), results[1].SqlDigest)
|
||||
require.Equal(t, 5, getTotalCPUTime(results[1]))
|
||||
require.Equal(t, []byte("sqlDigest3"), results[2].SqlDigest)
|
||||
require.Equal(t, 3, getTotalCPUTime(results[2]))
|
||||
require.Equal(t, 21, getTotalCPUTime(results[1]))
|
||||
require.Equal(t, []byte("sqlDigest2"), results[2].SqlDigest)
|
||||
require.Equal(t, 22, getTotalCPUTime(results[2]))
|
||||
require.Equal(t, []byte("sqlDigest3"), results[3].SqlDigest)
|
||||
require.Equal(t, 9, getTotalCPUTime(results[3]))
|
||||
require.Equal(t, []byte("sqlDigest4"), results[4].SqlDigest)
|
||||
require.Equal(t, 4, getTotalCPUTime(results[4]))
|
||||
require.Equal(t, []byte("sqlDigest6"), results[5].SqlDigest)
|
||||
require.Equal(t, 6, getTotalCPUTime(results[5]))
|
||||
// sleep to wait for all SQL meta received.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
totalMetas := agentServer.GetTotalSQLMetas()
|
||||
|
||||
@ -20,7 +20,7 @@ import "go.uber.org/atomic"
|
||||
const (
|
||||
DefTiDBTopSQLEnable = false
|
||||
DefTiDBTopSQLPrecisionSeconds = 1
|
||||
DefTiDBTopSQLMaxStatementCount = 200
|
||||
DefTiDBTopSQLMaxStatementCount = 100
|
||||
DefTiDBTopSQLMaxCollect = 5000
|
||||
DefTiDBTopSQLReportIntervalSeconds = 60
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user