|
|
|
|
@ -64,10 +64,27 @@ func GetSyncLoadConcurrencyByCPU() int {
|
|
|
|
|
return 10
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// statsSyncLoad is used to load statistics synchronously when needed by SQL queries.
|
|
|
|
|
//
|
|
|
|
|
// It maintains two channels for handling statistics load tasks:
|
|
|
|
|
// - NeededItemsCh: High priority channel for tasks that haven't timed out yet (Higher priority)
|
|
|
|
|
// - TimeoutItemsCh: Lower priority channel for tasks that exceeded their timeout (Lower priority)
|
|
|
|
|
//
|
|
|
|
|
// The main workflow:
|
|
|
|
|
// 1. collect_column_stats_usage rule requests statistics via SendLoadRequests
|
|
|
|
|
// 2. Tasks are created and placed in channels
|
|
|
|
|
// 3. Worker goroutines pick up tasks from channels
|
|
|
|
|
// 4. Statistics are loaded from storage
|
|
|
|
|
// 5. Loaded statistics are cached via updateCachedItem for future use
|
|
|
|
|
// 6. Results are checked and stats are used in the SQL query
|
|
|
|
|
//
|
|
|
|
|
// It uses singleflight pattern to deduplicate concurrent requests for the same statistics.
|
|
|
|
|
// Requests that exceed their timeout are moved to a lower priority channel to be processed
|
|
|
|
|
// when there are no urgent requests.
|
|
|
|
|
type statsSyncLoad struct {
|
|
|
|
|
statsHandle statstypes.StatsHandle
|
|
|
|
|
is infoschema.InfoSchema
|
|
|
|
|
StatsLoad statstypes.StatsLoad
|
|
|
|
|
statsHandle statstypes.StatsHandle
|
|
|
|
|
neededItemsCh chan *statstypes.NeededItemTask
|
|
|
|
|
timeoutItemsCh chan *statstypes.NeededItemTask
|
|
|
|
|
// This mutex protects the statsCache from concurrent modifications by multiple workers.
|
|
|
|
|
// Since multiple workers may update the statsCache for the same table simultaneously,
|
|
|
|
|
// the mutex ensures thread-safety during these updates.
|
|
|
|
|
@ -77,11 +94,11 @@ type statsSyncLoad struct {
|
|
|
|
|
var globalStatsSyncLoadSingleFlight singleflight.Group
|
|
|
|
|
|
|
|
|
|
// NewStatsSyncLoad creates a new StatsSyncLoad.
|
|
|
|
|
func NewStatsSyncLoad(is infoschema.InfoSchema, statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad {
|
|
|
|
|
s := &statsSyncLoad{statsHandle: statsHandle, is: is}
|
|
|
|
|
func NewStatsSyncLoad(statsHandle statstypes.StatsHandle) statstypes.StatsSyncLoad {
|
|
|
|
|
s := &statsSyncLoad{statsHandle: statsHandle}
|
|
|
|
|
cfg := config.GetGlobalConfig()
|
|
|
|
|
s.StatsLoad.NeededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
|
|
|
|
s.StatsLoad.TimeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
|
|
|
|
s.neededItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
|
|
|
|
s.timeoutItemsCh = make(chan *statstypes.NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
|
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -121,7 +138,7 @@ func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHis
|
|
|
|
|
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case s.StatsLoad.NeededItemsCh <- task:
|
|
|
|
|
case s.neededItemsCh <- task:
|
|
|
|
|
metrics.SyncLoadDedupCounter.Inc()
|
|
|
|
|
select {
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
@ -219,7 +236,7 @@ func (s *statsSyncLoad) AppendNeededItem(task *statstypes.NeededItemTask, timeou
|
|
|
|
|
timer := time.NewTimer(timeout)
|
|
|
|
|
defer timer.Stop()
|
|
|
|
|
select {
|
|
|
|
|
case s.StatsLoad.NeededItemsCh <- task:
|
|
|
|
|
case s.neededItemsCh <- task:
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
return errors.New("Channel is full and timeout writing to channel")
|
|
|
|
|
}
|
|
|
|
|
@ -258,7 +275,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{
|
|
|
|
|
// - If the task is handled successfully, return nil, nil.
|
|
|
|
|
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
|
|
|
|
|
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
|
|
|
|
|
func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
|
|
|
|
|
//
|
|
|
|
|
// TODO: remove this session context, it's not necessary.
|
|
|
|
|
func (s *statsSyncLoad) HandleOneTask(_ sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
// recover for each task, worker keeps working
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
@ -267,7 +286,7 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
if lastTask == nil {
|
|
|
|
|
task, err = s.drainColTask(sctx, exit)
|
|
|
|
|
task, err = s.drainColTask(exit)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err != errExit {
|
|
|
|
|
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
|
|
|
|
|
@ -497,41 +516,41 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
|
|
|
|
|
return w, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// drainColTask will hang until a column task can return, and either task or error will be returned.
|
|
|
|
|
func (s *statsSyncLoad) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*statstypes.NeededItemTask, error) {
|
|
|
|
|
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
|
|
|
|
|
// drainColTask will hang until a task can return, and either task or error will be returned.
|
|
|
|
|
// The task will be drained from NeededItemsCh first, if no task, then TimeoutItemsCh.
|
|
|
|
|
func (s *statsSyncLoad) drainColTask(exit chan struct{}) (*statstypes.NeededItemTask, error) {
|
|
|
|
|
// select NeededItemsCh firstly, if no task, then select TimeoutColumnsCh
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-exit:
|
|
|
|
|
return nil, errExit
|
|
|
|
|
case task, ok := <-s.StatsLoad.NeededItemsCh:
|
|
|
|
|
case task, ok := <-s.neededItemsCh:
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed")
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from NeededItemsCh, maybe the chan is closed")
|
|
|
|
|
}
|
|
|
|
|
// if the task has already timeout, no sql is sync-waiting for it,
|
|
|
|
|
// so do not handle it just now, put it to another channel with lower priority
|
|
|
|
|
if time.Now().After(task.ToTimeout) {
|
|
|
|
|
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
|
|
|
|
|
s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
|
|
|
|
|
s.writeToTimeoutChan(s.timeoutItemsCh, task)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
return task, nil
|
|
|
|
|
case task, ok := <-s.StatsLoad.TimeoutItemsCh:
|
|
|
|
|
case task, ok := <-s.timeoutItemsCh:
|
|
|
|
|
select {
|
|
|
|
|
case <-exit:
|
|
|
|
|
return nil, errExit
|
|
|
|
|
case task0, ok0 := <-s.StatsLoad.NeededItemsCh:
|
|
|
|
|
case task0, ok0 := <-s.neededItemsCh:
|
|
|
|
|
if !ok0 {
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from NeededColumnsCh, maybe the chan is closed")
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from NeededItemsCh, maybe the chan is closed")
|
|
|
|
|
}
|
|
|
|
|
// send task back to TimeoutColumnsCh and return the task drained from NeededColumnsCh
|
|
|
|
|
s.writeToTimeoutChan(s.StatsLoad.TimeoutItemsCh, task)
|
|
|
|
|
// send task back to TimeoutItemsCh and return the task drained from NeededItemsCh
|
|
|
|
|
s.writeToTimeoutChan(s.timeoutItemsCh, task)
|
|
|
|
|
return task0, nil
|
|
|
|
|
default:
|
|
|
|
|
if !ok {
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from TimeoutColumnsCh, maybe the chan is closed")
|
|
|
|
|
return nil, errors.New("drainColTask: cannot read from TimeoutItemsCh, maybe the chan is closed")
|
|
|
|
|
}
|
|
|
|
|
// NeededColumnsCh is empty now, handle task from TimeoutColumnsCh
|
|
|
|
|
// NeededItemsCh is empty now, handle task from TimeoutItemsCh
|
|
|
|
|
return task, nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|