From d208b62025017f122a3d030d50ba802526e7ef48 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 16 Dec 2021 16:06:35 +0800 Subject: [PATCH] topsql: a centralized place to generate tipb report data (#30781) --- server/tidb_test.go | 2 +- util/topsql/reporter/datasink.go | 2 +- util/topsql/reporter/reporter.go | 102 ++++++++++++------ util/topsql/reporter/reporter_test.go | 4 +- util/topsql/reporter/single_target.go | 149 ++++++++++++-------------- util/topsql/topsql.go | 4 +- util/topsql/topsql_test.go | 4 +- 7 files changed, 147 insertions(+), 120 deletions(-) diff --git a/server/tidb_test.go b/server/tidb_test.go index 5c0c2e9e18..60bcee29c2 100644 --- a/server/tidb_test.go +++ b/server/tidb_test.go @@ -1536,7 +1536,7 @@ func TestTopSQLAgent(t *testing.T) { dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;") dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;") - r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan)) + r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan) tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r}) // TODO: change to ensure that the right sql statements are reported, not just counts diff --git a/util/topsql/reporter/datasink.go b/util/topsql/reporter/datasink.go index c4206c71dc..b82649095f 100644 --- a/util/topsql/reporter/datasink.go +++ b/util/topsql/reporter/datasink.go @@ -21,7 +21,7 @@ type DataSink interface { // TrySend pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be // specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within // the specified deadline, or the sink is closed, an error will be returned. - TrySend(data ReportData, deadline time.Time) error + TrySend(data *ReportData, deadline time.Time) error // IsPaused indicates that the DataSink is not expecting to receive records for now // and may resume in the future. diff --git a/util/topsql/reporter/reporter.go b/util/topsql/reporter/reporter.go index 39e3be5eae..574472da60 100644 --- a/util/topsql/reporter/reporter.go +++ b/util/topsql/reporter/reporter.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/topsql/tracecpu" + "github.com/pingcap/tipb/go-tipb" "github.com/wangjohn/quickselect" atomic2 "go.uber.org/atomic" "go.uber.org/zap" @@ -132,6 +133,9 @@ type RemoteTopSQLReporter struct { collectCPUDataChan chan cpuData reportCollectedDataChan chan collectedData + + // calling decodePlan this can take a while, so should not block critical paths + decodePlan planBinaryDecodeFunc } // SQLMeta is the SQL meta which contains the normalized SQL string and a bool field which uses to distinguish internal SQL. @@ -144,7 +148,7 @@ type SQLMeta struct { // // planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string // MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache -func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter { +func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter { ctx, cancel := context.WithCancel(context.Background()) tsr := &RemoteTopSQLReporter{ ctx: ctx, @@ -152,6 +156,7 @@ func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter { dataSink: dataSink, collectCPUDataChan: make(chan cpuData, 1), reportCollectedDataChan: make(chan collectedData, 1), + decodePlan: decodePlan, } tsr.normalizedSQLMap.Store(&sync.Map{}) tsr.normalizedPlanMap.Store(&sync.Map{}) @@ -259,8 +264,8 @@ func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, t others.CPUTimeMsTotal += uint64(totalCPUTimeMs) } -// addEvictedIntoSortedDataPoints adds the evict dataPoints into others. -// Attention, this function depend on others dataPoints is sorted, and this function will modify the evict dataPoints +// addEvictedIntoSortedDataPoints adds evicted dataPoints into others. +// Attention, this function depend on others dataPoints is sorted, and this function will modify evicted dataPoints // to make sure it is sorted by timestamp. func addEvictedIntoSortedDataPoints(others *dataPoints, evict *dataPoints) *dataPoints { if others == nil { @@ -451,29 +456,14 @@ type collectedData struct { // ReportData contains data that reporter sends to the agent type ReportData struct { - // collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N. - collectedData []*dataPoints - normalizedSQLMap *sync.Map - normalizedPlanMap *sync.Map + // CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N. + CPUTimeRecords []tipb.CPUTimeRecord + SQLMetas []tipb.SQLMeta + PlanMetas []tipb.PlanMeta } func (d *ReportData) hasData() bool { - if len(d.collectedData) > 0 { - return true - } - cnt := 0 - d.normalizedSQLMap.Range(func(key, value interface{}) bool { - cnt++ - return false - }) - if cnt > 0 { - return true - } - d.normalizedPlanMap.Range(func(key, value interface{}) bool { - cnt++ - return false - }) - return cnt > 0 + return len(d.CPUTimeRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0 } // reportWorker sends data to the gRPC endpoint from the `reportCollectedDataChan` one by one. @@ -497,11 +487,17 @@ func (tsr *RemoteTopSQLReporter) reportWorker() { // getReportData gets ReportData from the collectedData. // This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N. -func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData { +func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) *ReportData { + records := getTopNFromCollected(collected) + return tsr.buildReportData(records, collected.normalizedSQLMap, collected.normalizedPlanMap) +} + +func getTopNFromCollected(collected collectedData) (records []*dataPoints) { // Fetch TopN dataPoints. others := collected.records[keyOthers] delete(collected.records, keyOthers) - records := make([]*dataPoints, 0, len(collected.records)) + + records = make([]*dataPoints, 0, len(collected.records)) for _, v := range collected.records { records = append(records, v) } @@ -514,7 +510,7 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa sort.Sort(others) } for _, evict := range evicted { - // SQL meta will not be evicted, since the evicted SQL can be appear on Other components (TiKV) TopN records. + // SQL meta will not be evicted, since the evicted SQL can be appeared on Other components (TiKV) TopN records. others = addEvictedIntoSortedDataPoints(others, evict) } @@ -523,14 +519,56 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa records = append(records, others) } - return ReportData{ - collectedData: records, - normalizedSQLMap: collected.normalizedSQLMap, - normalizedPlanMap: collected.normalizedPlanMap, - } + return } -func (tsr *RemoteTopSQLReporter) doReport(data ReportData) { +// buildReportData convert record data in dataPoints slice and meta data in sync.Map to ReportData. +// +// Attention, caller should guarantee no more reader or writer access `sqlMap` and `planMap`, because buildReportData +// will do heavy jobs in sync.Map.Range and it may block other readers and writers. +func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) *ReportData { + res := &ReportData{ + CPUTimeRecords: make([]tipb.CPUTimeRecord, 0, len(records)), + SQLMetas: make([]tipb.SQLMeta, 0, len(records)), + PlanMetas: make([]tipb.PlanMeta, 0, len(records)), + } + + for _, record := range records { + res.CPUTimeRecords = append(res.CPUTimeRecords, tipb.CPUTimeRecord{ + RecordListTimestampSec: record.TimestampList, + RecordListCpuTimeMs: record.CPUTimeMsList, + SqlDigest: record.SQLDigest, + PlanDigest: record.PlanDigest, + }) + } + + sqlMap.Range(func(key, value interface{}) bool { + meta := value.(SQLMeta) + res.SQLMetas = append(res.SQLMetas, tipb.SQLMeta{ + SqlDigest: []byte(key.(string)), + NormalizedSql: meta.normalizedSQL, + IsInternalSql: meta.isInternal, + }) + return true + }) + + planMap.Range(func(key, value interface{}) bool { + planDecoded, errDecode := tsr.decodePlan(value.(string)) + if errDecode != nil { + logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) + return true + } + res.PlanMetas = append(res.PlanMetas, tipb.PlanMeta{ + PlanDigest: []byte(key.(string)), + NormalizedPlan: planDecoded, + }) + return true + }) + + return res +} + +func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) { defer util.Recover("top-sql", "doReport", nil, false) if !data.hasData() { diff --git a/util/topsql/reporter/reporter_test.go b/util/topsql/reporter/reporter_test.go index 33d048fcf1..7f6c9f9a25 100644 --- a/util/topsql/reporter/reporter_test.go +++ b/util/topsql/reporter/reporter_test.go @@ -72,8 +72,8 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem conf.TopSQL.ReceiverAddress = addr }) - rc := NewSingleTargetDataSink(mockPlanBinaryDecoderFunc) - ts := NewRemoteTopSQLReporter(rc) + rc := NewSingleTargetDataSink() + ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc) return ts } diff --git a/util/topsql/reporter/single_target.go b/util/topsql/reporter/single_target.go index 5ed12c1185..b7a3425035 100644 --- a/util/topsql/reporter/single_target.go +++ b/util/topsql/reporter/single_target.go @@ -37,13 +37,10 @@ type SingleTargetDataSink struct { curRPCAddr string conn *grpc.ClientConn sendTaskCh chan sendTask - - // calling decodePlan this can take a while, so should not block critical paths - decodePlan planBinaryDecodeFunc } // NewSingleTargetDataSink returns a new SingleTargetDataSink -func NewSingleTargetDataSink(decodePlan planBinaryDecodeFunc) *SingleTargetDataSink { +func NewSingleTargetDataSink() *SingleTargetDataSink { ctx, cancel := context.WithCancel(context.Background()) dataSink := &SingleTargetDataSink{ ctx: ctx, @@ -52,8 +49,6 @@ func NewSingleTargetDataSink(decodePlan planBinaryDecodeFunc) *SingleTargetDataS curRPCAddr: "", conn: nil, sendTaskCh: make(chan sendTask, 1), - - decodePlan: decodePlan, } go dataSink.recoverRun() return dataSink @@ -107,7 +102,7 @@ var _ DataSink = &SingleTargetDataSink{} // TrySend implements the DataSink interface. // Currently the implementation will establish a new connection every time, // which is suitable for a per-minute sending period -func (ds *SingleTargetDataSink) TrySend(data ReportData, deadline time.Time) error { +func (ds *SingleTargetDataSink) TrySend(data *ReportData, deadline time.Time) error { select { case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}: return nil @@ -147,7 +142,7 @@ func (ds *SingleTargetDataSink) Close() { ds.conn = nil } -func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data ReportData) error { +func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data *ReportData) error { err := ds.tryEstablishConnection(ctx, addr) if err != nil { return err @@ -159,15 +154,15 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data Re go func() { defer wg.Done() - errCh <- ds.sendBatchSQLMeta(ctx, data.normalizedSQLMap) + errCh <- ds.sendBatchSQLMeta(ctx, data.SQLMetas) }() go func() { defer wg.Done() - errCh <- ds.sendBatchPlanMeta(ctx, data.normalizedPlanMap) + errCh <- ds.sendBatchPlanMeta(ctx, data.PlanMetas) }() go func() { defer wg.Done() - errCh <- ds.sendBatchCPUTimeRecord(ctx, data.collectedData) + errCh <- ds.sendBatchCPUTimeRecord(ctx, data.CPUTimeRecords) }() wg.Wait() close(errCh) @@ -180,111 +175,105 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data Re } // sendBatchCPUTimeRecord sends a batch of TopSQL records by stream. -func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []*dataPoints) error { +func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []tipb.CPUTimeRecord) (err error) { if len(records) == 0 { return nil } + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportRecordCounterHistogram.Observe(float64(sentCount)) + if err != nil { + reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + client := tipb.NewTopSQLAgentClient(ds.conn) stream, err := client.ReportCPUTimeRecords(ctx) if err != nil { return err } - for _, record := range records { - record := &tipb.CPUTimeRecord{ - RecordListTimestampSec: record.TimestampList, - RecordListCpuTimeMs: record.CPUTimeMsList, - SqlDigest: record.SQLDigest, - PlanDigest: record.PlanDigest, - } - if err := stream.Send(record); err != nil { - return err + for i := range records { + if err = stream.Send(&records[i]); err != nil { + return } + sentCount += 1 } - topSQLReportRecordCounterHistogram.Observe(float64(len(records))) + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream _, err = stream.CloseAndRecv() - if err != nil { - reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return nil + return } // sendBatchSQLMeta sends a batch of SQL metas by stream. -func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMap *sync.Map) error { +func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) { + if len(sqlMetas) == 0 { + return + } + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportSQLCountHistogram.Observe(float64(sentCount)) + + if err != nil { + reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + client := tipb.NewTopSQLAgentClient(ds.conn) stream, err := client.ReportSQLMeta(ctx) if err != nil { return err } - cnt := 0 - sqlMap.Range(func(key, value interface{}) bool { - cnt++ - meta := value.(SQLMeta) - sqlMeta := &tipb.SQLMeta{ - SqlDigest: []byte(key.(string)), - NormalizedSql: meta.normalizedSQL, - IsInternalSql: meta.isInternal, + + for i := range sqlMetas { + if err = stream.Send(&sqlMetas[i]); err != nil { + return } - if err = stream.Send(sqlMeta); err != nil { - return false - } - return true - }) - // stream.Send return error - if err != nil { - return err + sentCount += 1 } - topSQLReportSQLCountHistogram.Observe(float64(cnt)) + + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream _, err = stream.CloseAndRecv() - if err != nil { - reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return nil + return } // sendBatchPlanMeta sends a batch of SQL metas by stream. -func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMap *sync.Map) error { +func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) { + if len(planMetas) == 0 { + return nil + } + start := time.Now() + sentCount := 0 + defer func() { + topSQLReportPlanCountHistogram.Observe(float64(sentCount)) + if err != nil { + reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) + } + reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) + }() + client := tipb.NewTopSQLAgentClient(ds.conn) stream, err := client.ReportPlanMeta(ctx) if err != nil { return err } - cnt := 0 - planMap.Range(func(key, value interface{}) bool { - planDecoded, errDecode := ds.decodePlan(value.(string)) - if errDecode != nil { - logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode)) - return true + + for i := range planMetas { + if err = stream.Send(&planMetas[i]); err != nil { + return err } - cnt++ - planMeta := &tipb.PlanMeta{ - PlanDigest: []byte(key.(string)), - NormalizedPlan: planDecoded, - } - if err = stream.Send(planMeta); err != nil { - return false - } - return true - }) - // stream.Send return error - if err != nil { - return err + sentCount += 1 } - topSQLReportPlanCountHistogram.Observe(float64(cnt)) + + // See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream _, err = stream.CloseAndRecv() - if err != nil { - reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds()) - return err - } - reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds()) - return err + return } // tryEstablishConnection establishes the gRPC connection if connection is not established. @@ -332,6 +321,6 @@ func (ds *SingleTargetDataSink) dial(ctx context.Context, targetRPCAddr string) } type sendTask struct { - data ReportData + data *ReportData deadline time.Time } diff --git a/util/topsql/topsql.go b/util/topsql/topsql.go index 67ceb24203..8167997b24 100644 --- a/util/topsql/topsql.go +++ b/util/topsql/topsql.go @@ -40,8 +40,8 @@ var globalTopSQLReport reporter.TopSQLReporter // SetupTopSQL sets up the top-sql worker. func SetupTopSQL() { - ds := reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan) - globalTopSQLReport = reporter.NewRemoteTopSQLReporter(ds) + ds := reporter.NewSingleTargetDataSink() + globalTopSQLReport = reporter.NewRemoteTopSQLReporter(ds, plancodec.DecodeNormalizedPlan) tracecpu.GlobalSQLCPUProfiler.SetCollector(globalTopSQLReport) tracecpu.GlobalSQLCPUProfiler.Run() } diff --git a/util/topsql/topsql_test.go b/util/topsql/topsql_test.go index d9ec679919..d2e8bbf96b 100644 --- a/util/topsql/topsql_test.go +++ b/util/topsql/topsql_test.go @@ -115,8 +115,8 @@ func TestTopSQLReporter(t *testing.T) { conf.TopSQL.ReceiverAddress = server.Address() }) - client := reporter.NewSingleTargetDataSink(mockPlanBinaryDecoderFunc) - report := reporter.NewRemoteTopSQLReporter(client) + dataSink := reporter.NewSingleTargetDataSink() + report := reporter.NewRemoteTopSQLReporter(dataSink, mockPlanBinaryDecoderFunc) defer report.Close() tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report})