statistics: enhance concurrency handling in stats initialization to reduce duplication (#60090)

ref pingcap/tidb#55043
This commit is contained in:
Rustin
2025-03-18 23:18:21 +08:00
committed by GitHub
parent 624888e22f
commit 19c2d3366e
3 changed files with 75 additions and 102 deletions

View File

@ -311,29 +311,6 @@ func (h *Handle) initStatsHistogramsLite(ctx context.Context, cache statstypes.S
return nil
}
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache statstypes.StatsCache) error {
sql := genInitStatsHistogramsSQL(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, cache, iter, false)
}
return nil
}
func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.Pool.SPool().Get()
if err != nil {
@ -371,12 +348,19 @@ func (h *Handle) initStatsHistogramsByPaging(is infoschema.InfoSchema, cache sta
return nil
}
func (h *Handle) initStatsHistogramsConcurrency(is infoschema.InfoSchema, cache statstypes.StatsCache, totalMemory uint64) error {
func (h *Handle) initStatsHistogramsConcurrently(is infoschema.InfoSchema, cache statstypes.StatsCache, totalMemory uint64, concurrency int) error {
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("histogram", func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task, totalMemory)
}, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval)
ls := initstats.NewRangeWorker(
"histogram",
func(task initstats.Task) error {
return h.initStatsHistogramsByPaging(is, cache, task, totalMemory)
},
concurrency,
uint64(maxTid),
uint64(initStatsStep),
initStatsPercentageInterval,
)
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
@ -440,29 +424,6 @@ func genInitStatsTopNSQLForIndexes(isPaging bool) string {
return selectPrefix + " and table_id >= %? and table_id < %?" + orderSuffix
}
func (h *Handle) initStatsTopN(cache statstypes.StatsCache, totalMemory uint64) error {
sql := genInitStatsTopNSQLForIndexes(false)
rc, err := util.Exec(h.initStatsCtx, sql)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter, totalMemory)
}
return nil
}
func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initstats.Task, totalMemory uint64) error {
se, err := h.Pool.SPool().Get()
if err != nil {
@ -499,18 +460,25 @@ func (h *Handle) initStatsTopNByPaging(cache statstypes.StatsCache, task initsta
return nil
}
func (h *Handle) initStatsTopNConcurrency(cache statstypes.StatsCache, totalMemory uint64) error {
func (h *Handle) initStatsTopNConcurrently(cache statstypes.StatsCache, totalMemory uint64, concurrency int) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("TopN", func(task initstats.Task) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
return h.initStatsTopNByPaging(cache, task, totalMemory)
}, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval)
ls := initstats.NewRangeWorker(
"TopN",
func(task initstats.Task) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
return h.initStatsTopNByPaging(cache, task, totalMemory)
},
concurrency,
uint64(maxTid),
uint64(initStatsStep),
initStatsPercentageInterval,
)
ls.LoadStats()
for tid <= maxTid {
if IsFullCacheFunc(cache, totalMemory) {
@ -632,30 +600,15 @@ func (h *Handle) initStatsBuckets(cache statstypes.StatsCache, totalMemory uint6
return nil
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err := h.initStatsBucketsConcurrency(cache, totalMemory)
err := h.initStatsBucketsConcurrently(cache, totalMemory, initstats.GetConcurrency())
if err != nil {
return errors.Trace(err)
}
} else {
sql := genInitStatsBucketsSQLForIndexes(false)
rc, err := util.Exec(h.initStatsCtx, sql)
err := h.initStatsBucketsConcurrently(cache, totalMemory, 1)
if err != nil {
return errors.Trace(err)
}
defer terror.Call(rc.Close)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
req := rc.NewChunk(nil)
iter := chunk.NewIterator4Chunk(req)
for {
err := rc.Next(ctx, req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsBuckets4Chunk(cache, iter)
}
}
tables := cache.Values()
for _, table := range tables {
@ -701,18 +654,25 @@ func (h *Handle) initStatsBucketsByPaging(cache statstypes.StatsCache, task init
return nil
}
func (h *Handle) initStatsBucketsConcurrency(cache statstypes.StatsCache, totalMemory uint64) error {
func (h *Handle) initStatsBucketsConcurrently(cache statstypes.StatsCache, totalMemory uint64, concurrency int) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
var maxTid = maxTidRecord.tid.Load()
tid := int64(0)
ls := initstats.NewRangeWorker("bucket", func(task initstats.Task) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
return h.initStatsBucketsByPaging(cache, task)
}, uint64(maxTid), uint64(initStatsStep), initStatsPercentageInterval)
ls := initstats.NewRangeWorker(
"bucket",
func(task initstats.Task) error {
if IsFullCacheFunc(cache, totalMemory) {
return nil
}
return h.initStatsBucketsByPaging(cache, task)
},
concurrency,
uint64(maxTid),
uint64(initStatsStep),
initStatsPercentageInterval,
)
ls.LoadStats()
for tid <= maxTid {
ls.SendTask(initstats.Task{
@ -791,18 +751,18 @@ func (h *Handle) InitStats(ctx context.Context, is infoschema.InfoSchema) (err e
statslogutil.StatsLogger().Info("complete to load the meta")
initstats.InitStatsPercentage.Store(initStatsPercentageInterval)
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsHistogramsConcurrency(is, cache, totalMemory)
err = h.initStatsHistogramsConcurrently(is, cache, totalMemory, initstats.GetConcurrency())
} else {
err = h.initStatsHistograms(is, cache)
err = h.initStatsHistogramsConcurrently(is, cache, totalMemory, 1)
}
statslogutil.StatsLogger().Info("complete to load the histogram")
if err != nil {
return errors.Trace(err)
}
if config.GetGlobalConfig().Performance.ConcurrentlyInitStats {
err = h.initStatsTopNConcurrency(cache, totalMemory)
err = h.initStatsTopNConcurrently(cache, totalMemory, initstats.GetConcurrency())
} else {
err = h.initStatsTopN(cache, totalMemory)
err = h.initStatsTopNConcurrently(cache, totalMemory, 1)
}
initstats.InitStatsPercentage.Store(initStatsPercentageInterval * 2)
statslogutil.StatsLogger().Info("complete to load the topn")

View File

@ -20,13 +20,13 @@ import (
"github.com/pingcap/tidb/pkg/config"
)
// getConcurrency gets the concurrency of loading stats.
// GetConcurrency gets the concurrency of loading stats.
// the concurrency is from 2 to 16.
// when Performance.ForceInitStats is true, the concurrency is from 2 to GOMAXPROCS(0)-2.
// -2 is to ensure that the system has enough resources to handle other tasks. such as GC and stats cache internal.
// when Performance.ForceInitStats is false, the concurrency is from 2 to GOMAXPROCS(0)/2.
// it is to ensure that concurrency doesn't affect the performance of customer's business.
func getConcurrency() int {
func GetConcurrency() int {
var concurrency int
if config.GetGlobalConfig().Performance.ForceInitStats {
concurrency = runtime.GOMAXPROCS(0) - 2

View File

@ -46,40 +46,53 @@ type Task struct {
}
// RangeWorker is used to load stats concurrently by the range of table id.
//
//nolint:fieldalignment
type RangeWorker struct {
dealFunc func(task Task) error
taskChan chan Task
logger *zap.Logger
taskName string
wg util.WaitGroupWrapper
taskCnt uint64
completeTaskCnt atomic.Uint64
progressLogger *zap.Logger
taskName string
taskChan chan Task
processTask func(task Task) error
taskCnt uint64
completeTaskCnt atomic.Uint64
totalPercentage float64
totalPercentageStep float64
concurrency int
wg util.WaitGroupWrapper
}
// NewRangeWorker creates a new RangeWorker.
func NewRangeWorker(taskName string, dealFunc func(task Task) error, maxTid, initStatsStep uint64, totalPercentageStep float64) *RangeWorker {
func NewRangeWorker(
taskName string,
processTask func(task Task) error,
concurrency int,
maxTid,
initStatsStep uint64,
totalPercentageStep float64,
) *RangeWorker {
taskCnt := uint64(1)
if maxTid > initStatsStep*2 {
taskCnt = maxTid / initStatsStep
}
worker := &RangeWorker{
taskName: taskName,
dealFunc: dealFunc,
processTask: processTask,
concurrency: concurrency,
taskChan: make(chan Task, 1),
taskCnt: taskCnt,
totalPercentage: InitStatsPercentage.Load(),
totalPercentageStep: totalPercentageStep,
}
worker.logger = singletonStatsSamplerLogger()
worker.progressLogger = singletonStatsSamplerLogger()
return worker
}
// LoadStats loads stats concurrently when to init stats
func (ls *RangeWorker) LoadStats() {
concurrency := getConcurrency()
for n := 0; n < concurrency; n++ {
for n := 0; n < ls.concurrency; n++ {
ls.wg.Run(func() {
ls.loadStats()
})
@ -88,14 +101,14 @@ func (ls *RangeWorker) LoadStats() {
func (ls *RangeWorker) loadStats() {
for task := range ls.taskChan {
if err := ls.dealFunc(task); err != nil {
if err := ls.processTask(task); err != nil {
logutil.BgLogger().Error("load stats failed", zap.Error(err))
}
if ls.logger != nil {
if ls.progressLogger != nil {
completeTaskCnt := ls.completeTaskCnt.Add(1)
taskPercentage := float64(completeTaskCnt)/float64(ls.taskCnt)*ls.totalPercentageStep + ls.totalPercentage
InitStatsPercentage.Store(taskPercentage)
ls.logger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt))
ls.progressLogger.Info(fmt.Sprintf("load %s [%d/%d]", ls.taskName, completeTaskCnt, ls.taskCnt))
}
}
}