diff --git a/util/topsql/reporter/datamodel.go b/util/topsql/reporter/datamodel.go index 48a5161491..c03e41724b 100644 --- a/util/topsql/reporter/datamodel.go +++ b/util/topsql/reporter/datamodel.go @@ -496,29 +496,15 @@ func (c *collecting) appendOthersStmtStatsItem(timestamp uint64, item stmtstats. others.appendStmtStatsItem(timestamp, item) } -// compactToTopNAndOthers returns the largest N records, other records will be packed and appended to the end. -func (c *collecting) compactToTopNAndOthers(n int) records { +// getReportRecords returns all records, others record will be packed and appended to the end. +func (c *collecting) getReportRecords() records { others := c.records[keyOthers] delete(c.records, keyOthers) rs := make(records, 0, len(c.records)) for _, v := range c.records { rs = append(rs, *v) } - // Fetch TopN records. - var evicted records - rs, evicted = rs.topN(n) - if others != nil { - // Sort the records by timestamp to fix the affect of time jump backward. - sort.Sort(others) - } else { - others = newRecord(nil, nil) - } - for _, evict := range evicted { - e := evict // Avoid implicit memory aliasing in for loop. - others.merge(&e) - } - if others.totalCPUTimeMs > 0 { - // append others which summarize all evicted item's cpu-time. + if others != nil && others.totalCPUTimeMs > 0 { rs = append(rs, *others) } return rs diff --git a/util/topsql/reporter/datamodel_test.go b/util/topsql/reporter/datamodel_test.go index a0b9978466..bb9015c75b 100644 --- a/util/topsql/reporter/datamodel_test.go +++ b/util/topsql/reporter/datamodel_test.go @@ -273,19 +273,16 @@ func Test_collecting_appendOthers(t *testing.T) { assert.Equal(t, uint64(2000), r.tsItems[1].stmtStats.SumDurationNs) } -func Test_collecting_compactToTopNAndOthers(t *testing.T) { +func Test_collecting_getReportRecords(t *testing.T) { c := newCollecting() c.getOrCreateRecord([]byte("SQL-1"), []byte("PLAN-1")).appendCPUTime(1, 1) c.getOrCreateRecord([]byte("SQL-2"), []byte("PLAN-2")).appendCPUTime(1, 2) c.getOrCreateRecord([]byte("SQL-3"), []byte("PLAN-3")).appendCPUTime(1, 3) - rs := c.compactToTopNAndOthers(1) - assert.Len(t, rs, 2) - assert.Equal(t, []byte("SQL-3"), rs[0].sqlDigest) - assert.Equal(t, []byte("PLAN-3"), rs[0].planDigest) - assert.Equal(t, uint64(3), rs[0].totalCPUTimeMs) - assert.Len(t, rs[0].tsItems, 1) - assert.Equal(t, uint32(3), rs[0].tsItems[0].cpuTimeMs) - assert.Equal(t, uint64(3), rs[1].totalCPUTimeMs) // 1 + 2 = 3 + c.getOrCreateRecord([]byte(keyOthers), []byte(keyOthers)).appendCPUTime(1, 10) + rs := c.getReportRecords() + assert.Len(t, rs, 4) + assert.Equal(t, uint32(10), rs[3].tsItems[0].cpuTimeMs) + assert.Equal(t, uint64(10), rs[3].totalCPUTimeMs) } func Test_collecting_take(t *testing.T) { diff --git a/util/topsql/reporter/datasink.go b/util/topsql/reporter/datasink.go index 7a269d73af..0af082f75c 100644 --- a/util/topsql/reporter/datasink.go +++ b/util/topsql/reporter/datasink.go @@ -43,7 +43,7 @@ type DataSinkRegisterer interface { // ReportData contains data that reporter sends to the agent. type ReportData struct { - // DataRecords contains the compactToTopNAndOthers records []tipb.TopSQLRecord and the `others` + // DataRecords contains the topN records of each second and the `others` // record which aggregation all []tipb.TopSQLRecord that is out of Top N. DataRecords []tipb.TopSQLRecord SQLMetas []tipb.SQLMeta diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 896b0a01bc..b3a8497f9c 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -277,8 +277,7 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { // that `data` contains. So we wait for a little while to ensure that writes // are finished. time.Sleep(time.Millisecond * 100) - // Get top N records from records. - rs := data.collected.compactToTopNAndOthers(int(topsqlstate.GlobalState.MaxStatementCount.Load())) + rs := data.collected.getReportRecords() // Convert to protobuf data and do report. tsr.doReport(&ReportData{ DataRecords: rs.toProto(), diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index ab0095d6e7..999f187811 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -224,32 +224,52 @@ func TestCollectAndTopN(t *testing.T) { records := []collector.SQLCPUTimeRecord{ newSQLCPUTimeRecord(tsr, 1, 1), newSQLCPUTimeRecord(tsr, 2, 2), + newSQLCPUTimeRecord(tsr, 3, 3), } + // SQL-2: 2ms + // SQL-3: 3ms + // Others: 1ms collectAndWait(tsr, 1, records) records = []collector.SQLCPUTimeRecord{ - newSQLCPUTimeRecord(tsr, 3, 3), newSQLCPUTimeRecord(tsr, 1, 1), + newSQLCPUTimeRecord(tsr, 3, 3), } + // SQL-1: 1ms + // SQL-3: 3ms collectAndWait(tsr, 2, records) records = []collector.SQLCPUTimeRecord{ - newSQLCPUTimeRecord(tsr, 4, 1), - newSQLCPUTimeRecord(tsr, 1, 1), + newSQLCPUTimeRecord(tsr, 4, 4), + newSQLCPUTimeRecord(tsr, 1, 10), + newSQLCPUTimeRecord(tsr, 3, 1), } + // SQL-1: 10ms + // SQL-4: 4ms + // Others: 1ms collectAndWait(tsr, 3, records) records = []collector.SQLCPUTimeRecord{ - newSQLCPUTimeRecord(tsr, 5, 1), - newSQLCPUTimeRecord(tsr, 1, 1), + newSQLCPUTimeRecord(tsr, 5, 5), + newSQLCPUTimeRecord(tsr, 4, 4), + newSQLCPUTimeRecord(tsr, 1, 10), + newSQLCPUTimeRecord(tsr, 2, 20), } + // SQL-2: 20ms + // SQL-1: 1ms + // Others: 9ms collectAndWait(tsr, 4, records) // Test for time jump back. records = []collector.SQLCPUTimeRecord{ - newSQLCPUTimeRecord(tsr, 6, 1), + newSQLCPUTimeRecord(tsr, 6, 6), newSQLCPUTimeRecord(tsr, 1, 1), + newSQLCPUTimeRecord(tsr, 2, 2), + newSQLCPUTimeRecord(tsr, 3, 3), } + // SQL-6: 6ms + // SQL-3: 3ms + // Others: 3ms collectAndWait(tsr, 0, records) // Wait agent server collect finish. @@ -257,7 +277,14 @@ func TestCollectAndTopN(t *testing.T) { // check for equality of server received batch and the original data results := agentServer.GetLatestRecords() - require.Len(t, results, 3) + // Digest total + // "": 14ms (others) + // SQL-1: 21ms + // SQL-2: 22ms + // SQL-3: 9ms + // SQL-4: 4ms + // SQL-6: 6ms + require.Len(t, results, 6) sort.Slice(results, func(i, j int) bool { return string(results[i].SqlDigest) < string(results[j].SqlDigest) }) @@ -269,19 +296,26 @@ func TestCollectAndTopN(t *testing.T) { return int(total) } require.Nil(t, results[0].SqlDigest) - require.Equal(t, 5, getTotalCPUTime(results[0])) - require.Equal(t, uint64(0), results[0].Items[0].TimestampSec) - require.Equal(t, uint64(1), results[0].Items[1].TimestampSec) - require.Equal(t, uint64(3), results[0].Items[2].TimestampSec) - require.Equal(t, uint64(4), results[0].Items[3].TimestampSec) + require.Equal(t, []byte(nil), results[0].SqlDigest) + require.Equal(t, 14, getTotalCPUTime(results[0])) + require.Equal(t, uint64(1), results[0].Items[0].TimestampSec) + require.Equal(t, uint64(3), results[0].Items[1].TimestampSec) + require.Equal(t, uint64(4), results[0].Items[2].TimestampSec) + require.Equal(t, uint64(0), results[0].Items[3].TimestampSec) require.Equal(t, uint32(1), results[0].Items[0].CpuTimeMs) - require.Equal(t, uint32(2), results[0].Items[1].CpuTimeMs) - require.Equal(t, uint32(1), results[0].Items[2].CpuTimeMs) - require.Equal(t, uint32(1), results[0].Items[3].CpuTimeMs) + require.Equal(t, uint32(1), results[0].Items[1].CpuTimeMs) + require.Equal(t, uint32(9), results[0].Items[2].CpuTimeMs) + require.Equal(t, uint32(3), results[0].Items[3].CpuTimeMs) require.Equal(t, []byte("sqlDigest1"), results[1].SqlDigest) - require.Equal(t, 5, getTotalCPUTime(results[1])) - require.Equal(t, []byte("sqlDigest3"), results[2].SqlDigest) - require.Equal(t, 3, getTotalCPUTime(results[2])) + require.Equal(t, 21, getTotalCPUTime(results[1])) + require.Equal(t, []byte("sqlDigest2"), results[2].SqlDigest) + require.Equal(t, 22, getTotalCPUTime(results[2])) + require.Equal(t, []byte("sqlDigest3"), results[3].SqlDigest) + require.Equal(t, 9, getTotalCPUTime(results[3])) + require.Equal(t, []byte("sqlDigest4"), results[4].SqlDigest) + require.Equal(t, 4, getTotalCPUTime(results[4])) + require.Equal(t, []byte("sqlDigest6"), results[5].SqlDigest) + require.Equal(t, 6, getTotalCPUTime(results[5])) // sleep to wait for all SQL meta received. time.Sleep(50 * time.Millisecond) totalMetas := agentServer.GetTotalSQLMetas() diff --git a/util/topsql/state/state.go b/util/topsql/state/state.go index 7298e4b425..4b89e38402 100644 --- a/util/topsql/state/state.go +++ b/util/topsql/state/state.go @@ -20,7 +20,7 @@ import "go.uber.org/atomic" const ( DefTiDBTopSQLEnable = false DefTiDBTopSQLPrecisionSeconds = 1 - DefTiDBTopSQLMaxStatementCount = 200 + DefTiDBTopSQLMaxStatementCount = 100 DefTiDBTopSQLMaxCollect = 5000 DefTiDBTopSQLReportIntervalSeconds = 60 )