topsql: a centralized place to generate tipb report data (#30781)

This commit is contained in:
Zhenchi
2021-12-16 16:06:35 +08:00
committed by GitHub
parent 5eac82b052
commit d208b62025
7 changed files with 147 additions and 120 deletions

View File

@ -1536,7 +1536,7 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")
r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan))
r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan)
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})
// TODO: change to ensure that the right sql statements are reported, not just counts

View File

@ -21,7 +21,7 @@ type DataSink interface {
// TrySend pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be
// specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data ReportData, deadline time.Time) error
TrySend(data *ReportData, deadline time.Time) error
// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.

View File

@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql/tracecpu"
"github.com/pingcap/tipb/go-tipb"
"github.com/wangjohn/quickselect"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
@ -132,6 +133,9 @@ type RemoteTopSQLReporter struct {
collectCPUDataChan chan cpuData
reportCollectedDataChan chan collectedData
// calling decodePlan this can take a while, so should not block critical paths
decodePlan planBinaryDecodeFunc
}
// SQLMeta is the SQL meta which contains the normalized SQL string and a bool field which uses to distinguish internal SQL.
@ -144,7 +148,7 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
@ -152,6 +156,7 @@ func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter {
dataSink: dataSink,
collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
decodePlan: decodePlan,
}
tsr.normalizedSQLMap.Store(&sync.Map{})
tsr.normalizedPlanMap.Store(&sync.Map{})
@ -259,8 +264,8 @@ func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, t
others.CPUTimeMsTotal += uint64(totalCPUTimeMs)
}
// addEvictedIntoSortedDataPoints adds the evict dataPoints into others.
// Attention, this function depend on others dataPoints is sorted, and this function will modify the evict dataPoints
// addEvictedIntoSortedDataPoints adds evicted dataPoints into others.
// Attention, this function depend on others dataPoints is sorted, and this function will modify evicted dataPoints
// to make sure it is sorted by timestamp.
func addEvictedIntoSortedDataPoints(others *dataPoints, evict *dataPoints) *dataPoints {
if others == nil {
@ -451,29 +456,14 @@ type collectedData struct {
// ReportData contains data that reporter sends to the agent
type ReportData struct {
// collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
collectedData []*dataPoints
normalizedSQLMap *sync.Map
normalizedPlanMap *sync.Map
// CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
CPUTimeRecords []tipb.CPUTimeRecord
SQLMetas []tipb.SQLMeta
PlanMetas []tipb.PlanMeta
}
func (d *ReportData) hasData() bool {
if len(d.collectedData) > 0 {
return true
}
cnt := 0
d.normalizedSQLMap.Range(func(key, value interface{}) bool {
cnt++
return false
})
if cnt > 0 {
return true
}
d.normalizedPlanMap.Range(func(key, value interface{}) bool {
cnt++
return false
})
return cnt > 0
return len(d.CPUTimeRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0
}
// reportWorker sends data to the gRPC endpoint from the `reportCollectedDataChan` one by one.
@ -497,11 +487,17 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
// getReportData gets ReportData from the collectedData.
// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N.
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData {
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) *ReportData {
records := getTopNFromCollected(collected)
return tsr.buildReportData(records, collected.normalizedSQLMap, collected.normalizedPlanMap)
}
func getTopNFromCollected(collected collectedData) (records []*dataPoints) {
// Fetch TopN dataPoints.
others := collected.records[keyOthers]
delete(collected.records, keyOthers)
records := make([]*dataPoints, 0, len(collected.records))
records = make([]*dataPoints, 0, len(collected.records))
for _, v := range collected.records {
records = append(records, v)
}
@ -514,7 +510,7 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa
sort.Sort(others)
}
for _, evict := range evicted {
// SQL meta will not be evicted, since the evicted SQL can be appear on Other components (TiKV) TopN records.
// SQL meta will not be evicted, since the evicted SQL can be appeared on Other components (TiKV) TopN records.
others = addEvictedIntoSortedDataPoints(others, evict)
}
@ -523,14 +519,56 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa
records = append(records, others)
}
return ReportData{
collectedData: records,
normalizedSQLMap: collected.normalizedSQLMap,
normalizedPlanMap: collected.normalizedPlanMap,
}
return
}
func (tsr *RemoteTopSQLReporter) doReport(data ReportData) {
// buildReportData convert record data in dataPoints slice and meta data in sync.Map to ReportData.
//
// Attention, caller should guarantee no more reader or writer access `sqlMap` and `planMap`, because buildReportData
// will do heavy jobs in sync.Map.Range and it may block other readers and writers.
func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) *ReportData {
res := &ReportData{
CPUTimeRecords: make([]tipb.CPUTimeRecord, 0, len(records)),
SQLMetas: make([]tipb.SQLMeta, 0, len(records)),
PlanMetas: make([]tipb.PlanMeta, 0, len(records)),
}
for _, record := range records {
res.CPUTimeRecords = append(res.CPUTimeRecords, tipb.CPUTimeRecord{
RecordListTimestampSec: record.TimestampList,
RecordListCpuTimeMs: record.CPUTimeMsList,
SqlDigest: record.SQLDigest,
PlanDigest: record.PlanDigest,
})
}
sqlMap.Range(func(key, value interface{}) bool {
meta := value.(SQLMeta)
res.SQLMetas = append(res.SQLMetas, tipb.SQLMeta{
SqlDigest: []byte(key.(string)),
NormalizedSql: meta.normalizedSQL,
IsInternalSql: meta.isInternal,
})
return true
})
planMap.Range(func(key, value interface{}) bool {
planDecoded, errDecode := tsr.decodePlan(value.(string))
if errDecode != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
return true
}
res.PlanMetas = append(res.PlanMetas, tipb.PlanMeta{
PlanDigest: []byte(key.(string)),
NormalizedPlan: planDecoded,
})
return true
})
return res
}
func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
defer util.Recover("top-sql", "doReport", nil, false)
if !data.hasData() {

View File

@ -72,8 +72,8 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem
conf.TopSQL.ReceiverAddress = addr
})
rc := NewSingleTargetDataSink(mockPlanBinaryDecoderFunc)
ts := NewRemoteTopSQLReporter(rc)
rc := NewSingleTargetDataSink()
ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc)
return ts
}

View File

@ -37,13 +37,10 @@ type SingleTargetDataSink struct {
curRPCAddr string
conn *grpc.ClientConn
sendTaskCh chan sendTask
// calling decodePlan this can take a while, so should not block critical paths
decodePlan planBinaryDecodeFunc
}
// NewSingleTargetDataSink returns a new SingleTargetDataSink
func NewSingleTargetDataSink(decodePlan planBinaryDecodeFunc) *SingleTargetDataSink {
func NewSingleTargetDataSink() *SingleTargetDataSink {
ctx, cancel := context.WithCancel(context.Background())
dataSink := &SingleTargetDataSink{
ctx: ctx,
@ -52,8 +49,6 @@ func NewSingleTargetDataSink(decodePlan planBinaryDecodeFunc) *SingleTargetDataS
curRPCAddr: "",
conn: nil,
sendTaskCh: make(chan sendTask, 1),
decodePlan: decodePlan,
}
go dataSink.recoverRun()
return dataSink
@ -107,7 +102,7 @@ var _ DataSink = &SingleTargetDataSink{}
// TrySend implements the DataSink interface.
// Currently the implementation will establish a new connection every time,
// which is suitable for a per-minute sending period
func (ds *SingleTargetDataSink) TrySend(data ReportData, deadline time.Time) error {
func (ds *SingleTargetDataSink) TrySend(data *ReportData, deadline time.Time) error {
select {
case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}:
return nil
@ -147,7 +142,7 @@ func (ds *SingleTargetDataSink) Close() {
ds.conn = nil
}
func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data ReportData) error {
func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data *ReportData) error {
err := ds.tryEstablishConnection(ctx, addr)
if err != nil {
return err
@ -159,15 +154,15 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data Re
go func() {
defer wg.Done()
errCh <- ds.sendBatchSQLMeta(ctx, data.normalizedSQLMap)
errCh <- ds.sendBatchSQLMeta(ctx, data.SQLMetas)
}()
go func() {
defer wg.Done()
errCh <- ds.sendBatchPlanMeta(ctx, data.normalizedPlanMap)
errCh <- ds.sendBatchPlanMeta(ctx, data.PlanMetas)
}()
go func() {
defer wg.Done()
errCh <- ds.sendBatchCPUTimeRecord(ctx, data.collectedData)
errCh <- ds.sendBatchCPUTimeRecord(ctx, data.CPUTimeRecords)
}()
wg.Wait()
close(errCh)
@ -180,111 +175,105 @@ func (ds *SingleTargetDataSink) doSend(ctx context.Context, addr string, data Re
}
// sendBatchCPUTimeRecord sends a batch of TopSQL records by stream.
func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []*dataPoints) error {
func (ds *SingleTargetDataSink) sendBatchCPUTimeRecord(ctx context.Context, records []tipb.CPUTimeRecord) (err error) {
if len(records) == 0 {
return nil
}
start := time.Now()
sentCount := 0
defer func() {
topSQLReportRecordCounterHistogram.Observe(float64(sentCount))
if err != nil {
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
}
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
}()
client := tipb.NewTopSQLAgentClient(ds.conn)
stream, err := client.ReportCPUTimeRecords(ctx)
if err != nil {
return err
}
for _, record := range records {
record := &tipb.CPUTimeRecord{
RecordListTimestampSec: record.TimestampList,
RecordListCpuTimeMs: record.CPUTimeMsList,
SqlDigest: record.SQLDigest,
PlanDigest: record.PlanDigest,
}
if err := stream.Send(record); err != nil {
return err
for i := range records {
if err = stream.Send(&records[i]); err != nil {
return
}
sentCount += 1
}
topSQLReportRecordCounterHistogram.Observe(float64(len(records)))
// See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream
_, err = stream.CloseAndRecv()
if err != nil {
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
return nil
return
}
// sendBatchSQLMeta sends a batch of SQL metas by stream.
func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMap *sync.Map) error {
func (ds *SingleTargetDataSink) sendBatchSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) {
if len(sqlMetas) == 0 {
return
}
start := time.Now()
sentCount := 0
defer func() {
topSQLReportSQLCountHistogram.Observe(float64(sentCount))
if err != nil {
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
}
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
}()
client := tipb.NewTopSQLAgentClient(ds.conn)
stream, err := client.ReportSQLMeta(ctx)
if err != nil {
return err
}
cnt := 0
sqlMap.Range(func(key, value interface{}) bool {
cnt++
meta := value.(SQLMeta)
sqlMeta := &tipb.SQLMeta{
SqlDigest: []byte(key.(string)),
NormalizedSql: meta.normalizedSQL,
IsInternalSql: meta.isInternal,
for i := range sqlMetas {
if err = stream.Send(&sqlMetas[i]); err != nil {
return
}
if err = stream.Send(sqlMeta); err != nil {
return false
}
return true
})
// stream.Send return error
if err != nil {
return err
sentCount += 1
}
topSQLReportSQLCountHistogram.Observe(float64(cnt))
// See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream
_, err = stream.CloseAndRecv()
if err != nil {
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
return nil
return
}
// sendBatchPlanMeta sends a batch of SQL metas by stream.
func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMap *sync.Map) error {
func (ds *SingleTargetDataSink) sendBatchPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) {
if len(planMetas) == 0 {
return nil
}
start := time.Now()
sentCount := 0
defer func() {
topSQLReportPlanCountHistogram.Observe(float64(sentCount))
if err != nil {
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
}
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
}()
client := tipb.NewTopSQLAgentClient(ds.conn)
stream, err := client.ReportPlanMeta(ctx)
if err != nil {
return err
}
cnt := 0
planMap.Range(func(key, value interface{}) bool {
planDecoded, errDecode := ds.decodePlan(value.(string))
if errDecode != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
return true
for i := range planMetas {
if err = stream.Send(&planMetas[i]); err != nil {
return err
}
cnt++
planMeta := &tipb.PlanMeta{
PlanDigest: []byte(key.(string)),
NormalizedPlan: planDecoded,
}
if err = stream.Send(planMeta); err != nil {
return false
}
return true
})
// stream.Send return error
if err != nil {
return err
sentCount += 1
}
topSQLReportPlanCountHistogram.Observe(float64(cnt))
// See https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream for how to avoid leaking the stream
_, err = stream.CloseAndRecv()
if err != nil {
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
return err
}
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
return err
return
}
// tryEstablishConnection establishes the gRPC connection if connection is not established.
@ -332,6 +321,6 @@ func (ds *SingleTargetDataSink) dial(ctx context.Context, targetRPCAddr string)
}
type sendTask struct {
data ReportData
data *ReportData
deadline time.Time
}

View File

@ -40,8 +40,8 @@ var globalTopSQLReport reporter.TopSQLReporter
// SetupTopSQL sets up the top-sql worker.
func SetupTopSQL() {
ds := reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan)
globalTopSQLReport = reporter.NewRemoteTopSQLReporter(ds)
ds := reporter.NewSingleTargetDataSink()
globalTopSQLReport = reporter.NewRemoteTopSQLReporter(ds, plancodec.DecodeNormalizedPlan)
tracecpu.GlobalSQLCPUProfiler.SetCollector(globalTopSQLReport)
tracecpu.GlobalSQLCPUProfiler.Run()
}

View File

@ -115,8 +115,8 @@ func TestTopSQLReporter(t *testing.T) {
conf.TopSQL.ReceiverAddress = server.Address()
})
client := reporter.NewSingleTargetDataSink(mockPlanBinaryDecoderFunc)
report := reporter.NewRemoteTopSQLReporter(client)
dataSink := reporter.NewSingleTargetDataSink()
report := reporter.NewRemoteTopSQLReporter(dataSink, mockPlanBinaryDecoderFunc)
defer report.Close()
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{report})