diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index 1896c745fc..6fc4575fa4 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -311,29 +311,6 @@ func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.S return nil } -func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error { - sql := genInitStatsHistogramsSQL(false) - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsHistograms4Chunk(is, cache, iter, false) - } - return nil -} - func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error { se, err := h.Pool.SPool().Get() if err != nil { @@ -371,12 +348,19 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta return nil } -func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache, totalMemory uint64) error { +func (h *Handle) initStatsHistogramsConcurrently(is infoschema.InfoSchema, cache statstypes.StatsCache, totalMemory uint64, concurrency int) error { var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker("histogram", func(task initstats.Task) error { - return h.initStatsHistogramsByPaging(is, cache, task, totalMemory) - }, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval) + ls := initstats.NewRangeWorker( + "histogram", + func(task initstats.Task) error { + return h.initStatsHistogramsByPaging(is, cache, task, totalMemory) + }, + concurrency, + uint64(maxTid), + uint64(initStatsStep), + initStatsPercentageInterval, + ) ls.LoadStats() for tid <= maxTid { ls.SendTask(initstats.Task{ @@ -440,29 +424,6 @@ func genInitStatsTopNSQLForIndexes(isPaging bool) string { return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix } -func (h *Handle) initStatsTopN(cache statstypes.StatsCache, totalMemory uint64) error { - sql := genInitStatsTopNSQLForIndexes(false) - rc, err := util.Exec(h.initStatsCtx, sql) - if err != nil { - return errors.Trace(err) - } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsTopN4Chunk(cache, iter, totalMemory) - } - return nil -} - func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error { se, err := h.Pool.SPool().Get() if err != nil { @@ -499,18 +460,25 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta return nil } -func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache, totalMemory uint64) error { +func (h *Handle) initStatsTopNConcurrently(cache statstypes.StatsCache, totalMemory uint64, concurrency int) error { if IsFullCacheFunc(cache, totalMemory) { return nil } var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker("TopN", func(task initstats.Task) error { - if IsFullCacheFunc(cache, totalMemory) { - return nil - } - return h.initStatsTopNByPaging(cache, task, totalMemory) - }, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval) + ls := initstats.NewRangeWorker( + "TopN", + func(task initstats.Task) error { + if IsFullCacheFunc(cache, totalMemory) { + return nil + } + return h.initStatsTopNByPaging(cache, task, totalMemory) + }, + concurrency, + uint64(maxTid), + uint64(initStatsStep), + initStatsPercentageInterval, + ) ls.LoadStats() for tid <= maxTid { if IsFullCacheFunc(cache, totalMemory) { @@ -632,30 +600,15 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint6 return nil } if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err := h.initStatsBucketsConcurrency(cache, totalMemory) + err := h.initStatsBucketsConcurrently(cache, totalMemory, initstats.GetConcurrency()) if err != nil { return errors.Trace(err) } } else { - sql := genInitStatsBucketsSQLForIndexes(false) - rc, err := util.Exec(h.initStatsCtx, sql) + err := h.initStatsBucketsConcurrently(cache, totalMemory, 1) if err != nil { return errors.Trace(err) } - defer terror.Call(rc.Close) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - req := rc.NewChunk(nil) - iter := chunk.NewIterator4Chunk(req) - for { - err := rc.Next(ctx, req) - if err != nil { - return errors.Trace(err) - } - if req.NumRows() == 0 { - break - } - h.initStatsBuckets4Chunk(cache, iter) - } } tables := cache.Values() for _, table := range tables { @@ -701,18 +654,25 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init return nil } -func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache, totalMemory uint64) error { +func (h *Handle) initStatsBucketsConcurrently(cache statstypes.StatsCache, totalMemory uint64, concurrency int) error { if IsFullCacheFunc(cache, totalMemory) { return nil } var maxTid = maxTidRecord.tid.Load() tid := int64(0) - ls := initstats.NewRangeWorker("bucket", func(task initstats.Task) error { - if IsFullCacheFunc(cache, totalMemory) { - return nil - } - return h.initStatsBucketsByPaging(cache, task) - }, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval) + ls := initstats.NewRangeWorker( + "bucket", + func(task initstats.Task) error { + if IsFullCacheFunc(cache, totalMemory) { + return nil + } + return h.initStatsBucketsByPaging(cache, task) + }, + concurrency, + uint64(maxTid), + uint64(initStatsStep), + initStatsPercentageInterval, + ) ls.LoadStats() for tid <= maxTid { ls.SendTask(initstats.Task{ @@ -791,18 +751,18 @@ func (h *Handle) InitStats(ctx context.Context, is infoschema.InfoSchema) (err e statslogutil.StatsLogger().Info("complete to load the meta") initstats.InitStatsPercentage.Store(initStatsPercentageInterval) if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err = h.initStatsHistogramsConcurrency(is, cache, totalMemory) + err = h.initStatsHistogramsConcurrently(is, cache, totalMemory, initstats.GetConcurrency()) } else { - err = h.initStatsHistograms(is, cache) + err = h.initStatsHistogramsConcurrently(is, cache, totalMemory, 1) } statslogutil.StatsLogger().Info("complete to load the histogram") if err != nil { return errors.Trace(err) } if config.GetGlobalConfig().Performance.ConcurrentlyInitStats { - err = h.initStatsTopNConcurrency(cache, totalMemory) + err = h.initStatsTopNConcurrently(cache, totalMemory, initstats.GetConcurrency()) } else { - err = h.initStatsTopN(cache, totalMemory) + err = h.initStatsTopNConcurrently(cache, totalMemory, 1) } initstats.InitStatsPercentage.Store(initStatsPercentageInterval * 2) statslogutil.StatsLogger().Info("complete to load the topn") diff --git a/pkg/statistics/handle/initstats/load_stats.go b/pkg/statistics/handle/initstats/load_stats.go index eeeed00049..111ff66297 100644 --- a/pkg/statistics/handle/initstats/load_stats.go +++ b/pkg/statistics/handle/initstats/load_stats.go @@ -20,13 +20,13 @@ import ( "github.com/pingcap/tidb/pkg/config" ) -// getConcurrency gets the concurrency of loading stats. +// GetConcurrency gets the concurrency of loading stats. // the concurrency is from 2 to 16. // when Performance.ForceInitStats is true, the concurrency is from 2 to GOMAXPROCS(0)-2. // -2 is to ensure that the system has enough resources to handle other tasks. such as GC and stats cache internal. // when Performance.ForceInitStats is false, the concurrency is from 2 to GOMAXPROCS(0)/2. // it is to ensure that concurrency doesn't affect the performance of customer's business. -func getConcurrency() int { +func GetConcurrency() int { var concurrency int if config.GetGlobalConfig().Performance.ForceInitStats { concurrency = runtime.GOMAXPROCS(0) - 2 diff --git a/pkg/statistics/handle/initstats/load_stats_page.go b/pkg/statistics/handle/initstats/load_stats_page.go index 59caac8e18..8a27ae4317 100644 --- a/pkg/statistics/handle/initstats/load_stats_page.go +++ b/pkg/statistics/handle/initstats/load_stats_page.go @@ -46,40 +46,53 @@ type Task struct { } // RangeWorker is used to load stats concurrently by the range of table id. +// +//nolint:fieldalignment type RangeWorker struct { - dealFunc func(task Task) error - taskChan chan Task - logger *zap.Logger - taskName string - wg util.WaitGroupWrapper - taskCnt uint64 - completeTaskCnt atomic.Uint64 + progressLogger *zap.Logger + + taskName string + taskChan chan Task + processTask func(task Task) error + taskCnt uint64 + completeTaskCnt atomic.Uint64 + totalPercentage float64 totalPercentageStep float64 + + concurrency int + wg util.WaitGroupWrapper } // NewRangeWorker creates a new RangeWorker. -func NewRangeWorker(taskName string, dealFunc func(task Task) error, maxTid, initStatsStep uint64, totalPercentageStep float64) *RangeWorker { +func NewRangeWorker( + taskName string, + processTask func(task Task) error, + concurrency int, + maxTid, + initStatsStep uint64, + totalPercentageStep float64, +) *RangeWorker { taskCnt := uint64(1) if maxTid > initStatsStep*2 { taskCnt = maxTid / initStatsStep } worker := &RangeWorker{ taskName: taskName, - dealFunc: dealFunc, + processTask: processTask, + concurrency: concurrency, taskChan: make(chan Task, 1), taskCnt: taskCnt, totalPercentage: InitStatsPercentage.Load(), totalPercentageStep: totalPercentageStep, } - worker.logger = singletonStatsSamplerLogger() + worker.progressLogger = singletonStatsSamplerLogger() return worker } // LoadStats loads stats concurrently when to init stats func (ls *RangeWorker) LoadStats() { - concurrency := getConcurrency() - for n := 0; n < concurrency; n++ { + for n := 0; n < ls.concurrency; n++ { ls.wg.Run(func() { ls.loadStats() }) @@ -88,14 +101,14 @@ func (ls *RangeWorker) LoadStats() { func (ls *RangeWorker) loadStats() { for task := range ls.taskChan { - if err := ls.dealFunc(task); err != nil { + if err := ls.processTask(task); err != nil { logutil.BgLogger().Error("load stats failed", zap.Error(err)) } - if ls.logger != nil { + if ls.progressLogger != nil { completeTaskCnt := ls.completeTaskCnt.Add(1) taskPercentage := float64(completeTaskCnt)/float64(ls.taskCnt)*ls.totalPercentageStep + ls.totalPercentage InitStatsPercentage.Store(taskPercentage) - ls.logger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt)) + ls.progressLogger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt)) } } }