From 62abfe6000bead94c6d3fc4cfd56b64aff204bb9 Mon Sep 17 00:00:00 2001 From: Lynn Date: Sat, 24 May 2025 20:15:07 +0800 Subject: [PATCH] *: support TopSQL with keyspace info for next-gen (#60884) close pingcap/tidb#60877 --- cmd/tidb-server/main.go | 2 +- pkg/keyspace/keyspace_test.go | 4 +++- pkg/util/topsql/collector/mock/mock.go | 4 ++++ pkg/util/topsql/reporter/datamodel.go | 28 +++++++++++++++------- pkg/util/topsql/reporter/datamodel_test.go | 17 ++++++++++--- pkg/util/topsql/reporter/reporter.go | 15 +++++++++--- pkg/util/topsql/reporter/reporter_test.go | 27 +++++++++++++++++---- pkg/util/topsql/topsql.go | 3 ++- 8 files changed, 78 insertions(+), 22 deletions(-) diff --git a/cmd/tidb-server/main.go b/cmd/tidb-server/main.go index 4711517fa6..fec81f2747 100644 --- a/cmd/tidb-server/main.go +++ b/cmd/tidb-server/main.go @@ -337,7 +337,7 @@ func main() { executor.Stop() close(exited) }) - topsql.SetupTopSQL(svr) + topsql.SetupTopSQL(keyspace.GetKeyspaceIDBySettings(), svr) terror.MustNil(svr.Run(dom)) <-exited syncLog() diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index f3cc3335c3..91a2689eff 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -52,13 +52,15 @@ func TestNoKeyspaceNameSet(t *testing.T) { require.Equal(t, true, IsKeyspaceNameEmpty(getKeyspaceName)) } -func TestNoKeyspaceIDBySettings(t *testing.T) { +func TestKeyspaceIDBySettings(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.KeyspaceName = "" }) + // convert KeyspaceName to uint32 failed getKeyspaceID := GetKeyspaceIDBySettings() require.Nil(t, getKeyspaceID) + // get keyspaceID normally config.UpdateGlobal(func(conf *config.Config) { conf.KeyspaceName = "123" }) diff --git a/pkg/util/topsql/collector/mock/mock.go b/pkg/util/topsql/collector/mock/mock.go index b8ee723850..56f52e15a6 100644 --- a/pkg/util/topsql/collector/mock/mock.go +++ b/pkg/util/topsql/collector/mock/mock.go @@ -80,6 +80,10 @@ func (c *TopSQLCollector) Collect(stats []collector.SQLCPUTimeRecord) { // BindProcessCPUTimeUpdater implements TopSQLReporter. func (*TopSQLCollector) BindProcessCPUTimeUpdater(_ collector.ProcessCPUTimeUpdater) {} +// BindKeyspaceID implements TopSQLReporter. +func (c *TopSQLCollector) BindKeyspaceID(_ *uint32) { +} + // CollectStmtStatsMap implements stmtstats.Collector. func (*TopSQLCollector) CollectStmtStatsMap(_ stmtstats.StatementStatsMap) {} diff --git a/pkg/util/topsql/reporter/datamodel.go b/pkg/util/topsql/reporter/datamodel.go index 4d0518419b..a68102aa6d 100644 --- a/pkg/util/topsql/reporter/datamodel.go +++ b/pkg/util/topsql/reporter/datamodel.go @@ -376,12 +376,20 @@ func (r *record) rebuildTsIndex() { } } +func keyspaceIDToScope(keyspaceID *uint32) *tipb.KeyspaceScope { + if keyspaceID == nil { + return nil + } + return &tipb.KeyspaceScope{KeyspaceId: *keyspaceID} +} + // toProto converts the record to the corresponding protobuf representation. -func (r *record) toProto() tipb.TopSQLRecord { +func (r *record) toProto(keyspaceID *uint32) tipb.TopSQLRecord { return tipb.TopSQLRecord{ - SqlDigest: r.sqlDigest, - PlanDigest: r.planDigest, - Items: r.tsItems.toProto(), + KeyspaceScope: keyspaceIDToScope(keyspaceID), + SqlDigest: r.sqlDigest, + PlanDigest: r.planDigest, + Items: r.tsItems.toProto(), } } @@ -416,10 +424,10 @@ func (rs records) topN(n int) (top, evicted records) { } // toProto converts the records to the corresponding protobuf representation. -func (rs records) toProto() []tipb.TopSQLRecord { +func (rs records) toProto(keyspaceID *uint32) []tipb.TopSQLRecord { pb := make([]tipb.TopSQLRecord, 0, len(rs)) for _, r := range rs { - pb = append(pb, r.toProto()) + pb = append(pb, r.toProto(keyspaceID)) } return pb } @@ -641,11 +649,12 @@ func (m *normalizedSQLMap) take() *normalizedSQLMap { } // toProto converts the normalizedSQLMap to the corresponding protobuf representation. -func (m *normalizedSQLMap) toProto() []tipb.SQLMeta { +func (m *normalizedSQLMap) toProto(keyspaceID *uint32) []tipb.SQLMeta { metas := make([]tipb.SQLMeta, 0, m.length.Load()) m.data.Load().Range(func(k, v any) bool { meta := v.(sqlMeta) metas = append(metas, tipb.SQLMeta{ + KeyspaceScope: keyspaceIDToScope(keyspaceID), SqlDigest: []byte(k.(string)), NormalizedSql: meta.normalizedSQL, IsInternalSql: meta.isInternal, @@ -705,12 +714,13 @@ func (m *normalizedPlanMap) take() *normalizedPlanMap { } // toProto converts the normalizedPlanMap to the corresponding protobuf representation. -func (m *normalizedPlanMap) toProto(decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) []tipb.PlanMeta { +func (m *normalizedPlanMap) toProto(keyspaceID *uint32, decodePlan planBinaryDecodeFunc, compressPlan planBinaryCompressFunc) []tipb.PlanMeta { metas := make([]tipb.PlanMeta, 0, m.length.Load()) m.data.Load().Range(func(k, v any) bool { originalMeta := v.(planMeta) protoMeta := tipb.PlanMeta{ - PlanDigest: hack.Slice(k.(string)), + KeyspaceScope: keyspaceIDToScope(keyspaceID), + PlanDigest: hack.Slice(k.(string)), } var err error diff --git a/pkg/util/topsql/reporter/datamodel_test.go b/pkg/util/topsql/reporter/datamodel_test.go index e20efd31ba..509b4fc520 100644 --- a/pkg/util/topsql/reporter/datamodel_test.go +++ b/pkg/util/topsql/reporter/datamodel_test.go @@ -201,7 +201,9 @@ func Test_record_toProto(t *testing.T) { totalCPUTimeMs: 123, tsItems: tsItems{{}, {}, {}}, } - pb := r.toProto() + id := uint32(123) + pb := r.toProto(&id) + require.Equal(t, id, pb.GetKeyspaceScope().KeyspaceId) require.Equal(t, []byte("SQL-1"), pb.SqlDigest) require.Equal(t, []byte("PLAN-1"), pb.PlanDigest) require.Len(t, pb.Items, 3) @@ -235,7 +237,7 @@ func Test_records_topN(t *testing.T) { func Test_records_toProto(t *testing.T) { rs := records{{}, {}} - pb := rs.toProto() + pb := rs.toProto(nil) require.Len(t, pb, 2) } @@ -373,23 +375,27 @@ func Test_normalizedSQLMap_toProto(t *testing.T) { m.register([]byte("SQL-1"), "SQL-1", true) m.register([]byte("SQL-2"), "SQL-2", false) m.register([]byte("SQL-3"), "SQL-3", true) - pb := m.toProto() + id := uint32(12345) + pb := m.toProto(&id) require.Len(t, pb, 3) hash := map[string]tipb.SQLMeta{} for _, meta := range pb { hash[meta.NormalizedSql] = meta } require.Equal(t, tipb.SQLMeta{ + KeyspaceScope: keyspaceIDToScope(&id), SqlDigest: []byte("SQL-1"), NormalizedSql: "SQL-1", IsInternalSql: true, }, hash["SQL-1"]) require.Equal(t, tipb.SQLMeta{ + KeyspaceScope: keyspaceIDToScope(&id), SqlDigest: []byte("SQL-2"), NormalizedSql: "SQL-2", IsInternalSql: false, }, hash["SQL-2"]) require.Equal(t, tipb.SQLMeta{ + KeyspaceScope: keyspaceIDToScope(&id), SqlDigest: []byte("SQL-3"), NormalizedSql: "SQL-3", IsInternalSql: true, @@ -450,7 +456,9 @@ func Test_normalizedPlanMap_toProto(t *testing.T) { m.register([]byte("PLAN-1"), "PLAN-1", false) m.register([]byte("PLAN-2"), "PLAN-2", true) m.register([]byte("PLAN-3"), "PLAN-3", false) + id := uint32(12345) pb := m.toProto( + &id, func(s string) (string, error) { return "[decoded] " + s, nil }, func(s []byte) string { return "[encoded] " + string(s) }) require.Len(t, pb, 3) @@ -459,14 +467,17 @@ func Test_normalizedPlanMap_toProto(t *testing.T) { hash[string(meta.PlanDigest)] = meta } require.Equal(t, tipb.PlanMeta{ + KeyspaceScope: keyspaceIDToScope(&id), PlanDigest: []byte("PLAN-1"), NormalizedPlan: "[decoded] PLAN-1", }, hash["PLAN-1"]) require.Equal(t, tipb.PlanMeta{ + KeyspaceScope: keyspaceIDToScope(&id), PlanDigest: []byte("PLAN-2"), EncodedNormalizedPlan: "[encoded] PLAN-2", }, hash["PLAN-2"]) require.Equal(t, tipb.PlanMeta{ + KeyspaceScope: keyspaceIDToScope(&id), PlanDigest: []byte("PLAN-3"), NormalizedPlan: "[decoded] PLAN-3", }, hash["PLAN-3"]) diff --git a/pkg/util/topsql/reporter/reporter.go b/pkg/util/topsql/reporter/reporter.go index 48d25b7495..4e11ffef7b 100644 --- a/pkg/util/topsql/reporter/reporter.go +++ b/pkg/util/topsql/reporter/reporter.go @@ -58,6 +58,9 @@ type TopSQLReporter interface { // BindProcessCPUTimeUpdater is used to pass ProcessCPUTimeUpdater BindProcessCPUTimeUpdater(updater collector.ProcessCPUTimeUpdater) + // BindKeyspaceID binds the keyspace ID to the reporter. + BindKeyspaceID(keyspaceID *uint32) + // Close uses to close and release the reporter resource. Close() } @@ -68,6 +71,7 @@ var _ DataSinkRegisterer = &RemoteTopSQLReporter{} // RemoteTopSQLReporter implements TopSQLReporter that sends data to a remote agent. // This should be called periodically to collect TopSQL resource usage metrics. type RemoteTopSQLReporter struct { + keyspaceID *uint32 ctx context.Context reportCollectedDataChan chan collectedData cancel context.CancelFunc @@ -136,6 +140,11 @@ func (tsr *RemoteTopSQLReporter) BindProcessCPUTimeUpdater(updater collector.Pro tsr.sqlCPUCollector.SetProcessCPUUpdater(updater) } +// BindKeyspaceID implements TopSQLReporter. +func (tsr *RemoteTopSQLReporter) BindKeyspaceID(keyspaceID *uint32) { + tsr.keyspaceID = keyspaceID +} + // CollectStmtStatsMap implements stmtstats.Collector. // // WARN: It will drop the DataRecords if the processing is not in time. @@ -277,9 +286,9 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { rs := data.collected.getReportRecords() // Convert to protobuf data and do report. tsr.doReport(&ReportData{ - DataRecords: rs.toProto(), - SQLMetas: data.normalizedSQLMap.toProto(), - PlanMetas: data.normalizedPlanMap.toProto(tsr.decodePlan, tsr.compressPlan), + DataRecords: rs.toProto(tsr.keyspaceID), + SQLMetas: data.normalizedSQLMap.toProto(tsr.keyspaceID), + PlanMetas: data.normalizedPlanMap.toProto(tsr.keyspaceID, tsr.decodePlan, tsr.compressPlan), }) case <-tsr.ctx.Done(): return diff --git a/pkg/util/topsql/reporter/reporter_test.go b/pkg/util/topsql/reporter/reporter_test.go index 23708b196e..c212e24ebf 100644 --- a/pkg/util/topsql/reporter/reporter_test.go +++ b/pkg/util/topsql/reporter/reporter_test.go @@ -33,6 +33,8 @@ const ( maxSQLNum = 5000 ) +var keyspaceID = uint32(123) + func populateCache(tsr *RemoteTopSQLReporter, begin, end int, timestamp uint64) { // register normalized sql for i := begin; i < end; i++ { @@ -61,9 +63,9 @@ func populateCache(tsr *RemoteTopSQLReporter, begin, end int, timestamp uint64) func reportCache(tsr *RemoteTopSQLReporter) { tsr.doReport(&ReportData{ - DataRecords: tsr.collecting.take().getReportRecords().toProto(), - SQLMetas: tsr.normalizedSQLMap.take().toProto(), - PlanMetas: tsr.normalizedPlanMap.take().toProto(tsr.decodePlan, tsr.compressPlan), + DataRecords: tsr.collecting.take().getReportRecords().toProto(&keyspaceID), + SQLMetas: tsr.normalizedSQLMap.take().toProto(&keyspaceID), + PlanMetas: tsr.normalizedPlanMap.take().toProto(&keyspaceID, tsr.decodePlan, tsr.compressPlan), }) } @@ -147,9 +149,11 @@ func TestCollectAndSendBatch(t *testing.T) { } sqlMeta, exist := findSQLMeta(data.SQLMetas, req.SqlDigest) require.True(t, exist) + require.Equal(t, keyspaceIDToScope(&keyspaceID), sqlMeta.KeyspaceScope) require.Equal(t, "sqlNormalized"+strconv.Itoa(id), sqlMeta.NormalizedSql) planMeta, exist := findPlanMeta(data.PlanMetas, req.PlanDigest) require.True(t, exist) + require.Equal(t, keyspaceIDToScope(&keyspaceID), sqlMeta.KeyspaceScope) require.Equal(t, "planNormalized"+strconv.Itoa(id), planMeta.NormalizedPlan) } } @@ -184,9 +188,11 @@ func TestCollectAndEvicted(t *testing.T) { require.Equal(t, uint32(id), req.Items[0].CpuTimeMs) sqlMeta, exist := findSQLMeta(data.SQLMetas, req.SqlDigest) require.True(t, exist) + require.Equal(t, keyspaceIDToScope(&keyspaceID), sqlMeta.KeyspaceScope) require.Equal(t, "sqlNormalized"+strconv.Itoa(id), sqlMeta.NormalizedSql) planMeta, exist := findPlanMeta(data.PlanMetas, req.PlanDigest) require.True(t, exist) + require.Equal(t, keyspaceIDToScope(&keyspaceID), sqlMeta.KeyspaceScope) require.Equal(t, "planNormalized"+strconv.Itoa(id), planMeta.NormalizedPlan) } } @@ -306,10 +312,16 @@ func TestCollectAndTopN(t *testing.T) { require.Equal(t, []byte("sqlDigest6"), results[5].SqlDigest) require.Equal(t, 6, getTotalCPUTime(results[5])) require.Equal(t, 6, len(ds.data[0].SQLMetas)) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[0].KeyspaceScope) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[1].KeyspaceScope) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[2].KeyspaceScope) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[3].KeyspaceScope) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[4].KeyspaceScope) + require.Equal(t, keyspaceIDToScope(&keyspaceID), results[5].KeyspaceScope) } func TestCollectCapacity(t *testing.T) { - tsr, _ := setupRemoteTopSQLReporter(maxSQLNum, 60) + tsr, _ := setupRemoteTopSQLReporter(maxSQLNum, 62) registerSQL := func(n int) { for i := range n { key := []byte("sqlDigest" + strconv.Itoa(i)) @@ -388,6 +400,7 @@ func TestCollectInternal(t *testing.T) { sqlMeta, exist := findSQLMeta(data.SQLMetas, req.SqlDigest) require.True(t, exist) require.Equal(t, id%2 == 0, sqlMeta.IsInternalSql) + require.Equal(t, keyspaceIDToScope(&keyspaceID), sqlMeta.KeyspaceScope) } } @@ -419,6 +432,7 @@ func TestMultipleDataSinks(t *testing.T) { d := <-ch require.NotNil(t, d) require.Len(t, d.DataRecords, 1) + require.Equal(t, keyspaceIDToScope(&keyspaceID), d.DataRecords[0].KeyspaceScope) require.Equal(t, []byte("sqlDigest1"), d.DataRecords[0].SqlDigest) require.Equal(t, []byte("planDigest1"), d.DataRecords[0].PlanDigest) require.Len(t, d.DataRecords[0].Items, 1) @@ -426,11 +440,13 @@ func TestMultipleDataSinks(t *testing.T) { require.Equal(t, uint32(2), d.DataRecords[0].Items[0].CpuTimeMs) require.Equal(t, []tipb.SQLMeta{{ + KeyspaceScope: keyspaceIDToScope(&keyspaceID), SqlDigest: []byte("sqlDigest1"), NormalizedSql: "sqlNormalized1", }}, d.SQLMetas) require.Equal(t, []tipb.PlanMeta{{ + KeyspaceScope: keyspaceIDToScope(&keyspaceID), PlanDigest: []byte("planDigest1"), NormalizedPlan: "planNormalized1", }}, d.PlanMetas) @@ -451,6 +467,7 @@ func TestMultipleDataSinks(t *testing.T) { d := <-chs[i] require.NotNil(t, d) require.Len(t, d.DataRecords, 1) + require.Equal(t, keyspaceIDToScope(&keyspaceID), d.DataRecords[0].KeyspaceScope) require.Equal(t, []byte("sqlDigest4"), d.DataRecords[0].SqlDigest) require.Equal(t, []byte("planDigest4"), d.DataRecords[0].PlanDigest) require.Len(t, d.DataRecords[0].Items, 1) @@ -458,12 +475,14 @@ func TestMultipleDataSinks(t *testing.T) { require.Equal(t, uint32(5), d.DataRecords[0].Items[0].CpuTimeMs) require.Equal(t, []tipb.SQLMeta{{ + KeyspaceScope: keyspaceIDToScope(&keyspaceID), SqlDigest: []byte("sqlDigest4"), NormalizedSql: "sqlNormalized4", IsInternalSql: true, }}, d.SQLMetas) require.Equal(t, []tipb.PlanMeta{{ + KeyspaceScope: keyspaceIDToScope(&keyspaceID), PlanDigest: []byte("planDigest4"), NormalizedPlan: "planNormalized4", }}, d.PlanMetas) diff --git a/pkg/util/topsql/topsql.go b/pkg/util/topsql/topsql.go index 91c0d15883..90649ca1f6 100644 --- a/pkg/util/topsql/topsql.go +++ b/pkg/util/topsql/topsql.go @@ -51,7 +51,8 @@ func init() { } // SetupTopSQL sets up the top-sql worker. -func SetupTopSQL(updater collector.ProcessCPUTimeUpdater) { +func SetupTopSQL(keyspaceID *uint32, updater collector.ProcessCPUTimeUpdater) { + globalTopSQLReport.BindKeyspaceID(keyspaceID) globalTopSQLReport.BindProcessCPUTimeUpdater(updater) globalTopSQLReport.Start() singleTargetDataSink.Start()