From fc52e147deb4ace85d66f7efe4d5c345d2bead1a Mon Sep 17 00:00:00 2001 From: Rustin Date: Thu, 13 Mar 2025 21:08:27 +0800 Subject: [PATCH] stats: remove useless timeout refresh and information schema reference (#60012) close pingcap/tidb#60011 --- pkg/domain/domain.go | 2 - pkg/statistics/handle/handle.go | 3 +- .../handle/handletest/handle_test.go | 1 - .../handle/syncload/stats_syncload.go | 69 ++++++++++++------- pkg/statistics/handle/types/interfaces.go | 27 -------- 5 files changed, 45 insertions(+), 57 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 9c0a92803c..c6f2baa24b 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2445,7 +2445,6 @@ func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error ctx, initStatsCtx, do.statsLease, - do.InfoSchema(), do.sysSessionPool, &do.sysProcesses, do.ddlNotifier, @@ -2492,7 +2491,6 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err ctx, initStatsCtx, do.statsLease, - do.InfoSchema(), do.sysSessionPool, &do.sysProcesses, do.ddlNotifier, diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 1a3923824b..171ae9979c 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -114,7 +114,6 @@ func NewHandle( _, /* ctx, keep it for feature usage */ initStatsCtx sessionctx.Context, lease time.Duration, - is infoschema.InfoSchema, pool pkgutil.DestroyableSessionPool, tracker sysproctrack.Tracker, ddlNotifier *notifier.DDLNotifier, @@ -141,7 +140,7 @@ func NewHandle( handle.StatsHistory = history.NewStatsHistory(handle) handle.StatsUsage = usage.NewStatsUsageImpl(handle) handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle, tracker, ddlNotifier) - handle.StatsSyncLoad = syncload.NewStatsSyncLoad(is, handle) + handle.StatsSyncLoad = syncload.NewStatsSyncLoad(handle) handle.StatsGlobal = globalstats.NewStatsGlobal(handle) handle.DDL = ddl.NewDDLHandler( handle.StatsReadWriter, diff --git a/pkg/statistics/handle/handletest/handle_test.go b/pkg/statistics/handle/handletest/handle_test.go index aee3485ee5..50bbb6c6ff 100644 --- a/pkg/statistics/handle/handletest/handle_test.go +++ b/pkg/statistics/handle/handletest/handle_test.go @@ -123,7 +123,6 @@ func TestVersion(t *testing.T) { testKit.Session(), testKit2.Session(), time.Millisecond, - is, do.SysSessionPool(), do.SysProcTracker(), do.DDLNotifier(), diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 600e632fda..2b6b9aa57b 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -64,10 +64,27 @@ func GetSyncLoadConcurrencyByCPU() int { return 10 } +// statsSyncLoad is used to load statistics synchronously when needed by SQL queries. +// +// It maintains two channels for handling statistics load tasks: +// - NeededItemsCh: High priority channel for tasks that haven't timed out yet (Higher priority) +// - TimeoutItemsCh: Lower priority channel for tasks that exceeded their timeout (Lower priority) +// +// The main workflow: +// 1. collect_column_stats_usage rule requests statistics via SendLoadRequests +// 2. Tasks are created and placed in channels +// 3. Worker goroutines pick up tasks from channels +// 4. Statistics are loaded from storage +// 5. Loaded statistics are cached via updateCachedItem for future use +// 6. Results are checked and stats are used in the SQL query +// +// It uses singleflight pattern to deduplicate concurrent requests for the same statistics. +// Requests that exceed their timeout are moved to a lower priority channel to be processed +// when there are no urgent requests. type statsSyncLoad struct { - statsHandle statstypes.StatsHandle - is infoschema.InfoSchema - StatsLoad statstypes.StatsLoad + statsHandle statstypes.StatsHandle + neededItemsCh chan *statstypes.NeededItemTask + timeoutItemsCh chan *statstypes.NeededItemTask // This mutex protects the statsCache from concurrent modifications by multiple workers. // Since multiple workers may update the statsCache for the same table simultaneously, // the mutex ensures thread-safety during these updates. @@ -77,11 +94,11 @@ type statsSyncLoad struct { var globalStatsSyncLoadSingleFlight singleflight.Group // NewStatsSyncLoad creates a new StatsSyncLoad. -func NewStatsSyncLoad(is infoschema.InfoSchema, statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { - s := &statsSyncLoad{statsHandle: statsHandle, is: is} +func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad { + s := &statsSyncLoad{statsHandle: statsHandle} cfg := config.GetGlobalConfig() - s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) - s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.neededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) + s.timeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize) return s } @@ -121,7 +138,7 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis ResultCh: make(chan stmtctx.StatsLoadResult, 1), } select { - case s.StatsLoad.NeededItemsCh <- task: + case s.neededItemsCh <- task: metrics.SyncLoadDedupCounter.Inc() select { case <-timer.C: @@ -219,7 +236,7 @@ func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeou timer := time.NewTimer(timeout) defer timer.Stop() select { - case s.StatsLoad.NeededItemsCh <- task: + case s.neededItemsCh <- task: case <-timer.C: return errors.New("Channel is full and timeout writing to channel") } @@ -258,7 +275,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{ // - If the task is handled successfully, return nil, nil. // - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. // - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. -func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { +// +// TODO: remove this session context, it's not necessary. +func (s *statsSyncLoad) HandleOneTask(_ sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -267,7 +286,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } }() if lastTask == nil { - task, err = s.drainColTask(sctx, exit) + task, err = s.drainColTask(exit) if err != nil { if err != errExit { logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err)) @@ -497,41 +516,41 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta return w, nil } -// drainColTask will hang until a column task can return, and either task or error will be returned. -func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) { - // select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh +// drainColTask will hang until a task can return, and either task or error will be returned. +// The task will be drained from NeededItemsCh first, if no task, then TimeoutItemsCh. +func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItemTask, error) { + // select NeededItemsCh firstly, if no task, then select TimeoutColumnsCh for { select { case <-exit: return nil, errExit - case task, ok := <-s.StatsLoad.NeededItemsCh: + case task, ok := <-s.neededItemsCh: if !ok { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + return nil, errors.New("drainColTask: cannot read from NeededItemsCh, maybe the chan is closed") } // if the task has already timeout, no sql is sync-waiting for it, // so do not handle it just now, put it to another channel with lower priority if time.Now().After(task.ToTimeout) { - task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) - s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + s.writeToTimeoutChan(s.timeoutItemsCh, task) continue } return task, nil - case task, ok := <-s.StatsLoad.TimeoutItemsCh: + case task, ok := <-s.timeoutItemsCh: select { case <-exit: return nil, errExit - case task0, ok0 := <-s.StatsLoad.NeededItemsCh: + case task0, ok0 := <-s.neededItemsCh: if !ok0 { - return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed") + return nil, errors.New("drainColTask: cannot read from NeededItemsCh, maybe the chan is closed") } - // send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh - s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task) + // send task back to TimeoutItemsCh and return the task drained from NeededItemsCh + s.writeToTimeoutChan(s.timeoutItemsCh, task) return task0, nil default: if !ok { - return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed") + return nil, errors.New("drainColTask: cannot read from TimeoutItemsCh, maybe the chan is closed") } - // NeededColumnsCh is empty now, handle task from TimeoutColumnsCh + // NeededItemsCh is empty now, handle task from TimeoutItemsCh return task, nil } } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index c423465af3..6f859989ae 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -439,33 +439,6 @@ type NeededItemTask struct { Retry int } -// StatsLoad is used to load stats concurrently -// TODO(hawkingrei): Our implementation of loading statistics is flawed. -// Currently, we enqueue tasks that require loading statistics into a channel, -// from which workers retrieve tasks to process. Then, using the singleflight mechanism, -// we filter out duplicate tasks. However, the issue with this approach is that it does -// not filter out all duplicate tasks, but only the duplicates within the number of workers. -// Such an implementation is not reasonable. -// -// We should first filter all tasks through singleflight as shown in the diagram, and then use workers to load stats. -// -// ┌─────────▼──────────▼─────────────▼──────────────▼────────────────▼────────────────────┐ -// │ │ -// │ singleflight │ -// │ │ -// └───────────────────────────────────────────────────────────────────────────────────────┘ -// -// │ │ -// ┌────────────▼──────┐ ┌───────▼───────────┐ -// │ │ │ │ -// │ syncload worker │ │ syncload worker │ -// │ │ │ │ -// └───────────────────┘ └───────────────────┘ -type StatsLoad struct { - NeededItemsCh chan *NeededItemTask - TimeoutItemsCh chan *NeededItemTask -} - // StatsSyncLoad implement the sync-load feature. type StatsSyncLoad interface { // SendLoadRequests sends load requests to the channel.