Files
2025-05-07 13:37:02 +00:00

239 lines
5.7 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package spool
import (
"sync"
"sync/atomic"
"time"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/resourcemanager"
"github.com/pingcap/tidb/pkg/resourcemanager/pool"
"github.com/pingcap/tidb/pkg/resourcemanager/poolmanager"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"github.com/sasha-s/go-deadlock"
"go.uber.org/zap"
)
const waitInterval = 5 * time.Millisecond
// Pool is a single goroutine pool. it can not reuse the goroutine.
type Pool struct {
wg sync.WaitGroup
mu deadlock.RWMutex
options *Options
originCapacity int32
capacity int32
running atomic.Int32
waiting atomic.Int32
isStop atomic.Bool
condMu sync.Mutex
cond sync.Cond
concurrencyMetrics prometheus.Gauge
taskManager poolmanager.TaskManager
pool.BasePool
}
// NewPool create a single goroutine pool.
func NewPool(name string, size int32, component util.Component, options ...Option) (*Pool, error) {
opts := loadOptions(options...)
result := &Pool{
options: opts,
concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name),
taskManager: poolmanager.NewTaskManager(size),
}
result.cond = *sync.NewCond(&result.condMu)
if size == 0 {
return nil, pool.ErrPoolParamsInvalid
}
result.SetName(name)
result.capacity = size
result.originCapacity = size
result.concurrencyMetrics.Set(float64(size))
err := resourcemanager.InstanceResourceManager.Register(result, name, component)
if err != nil {
return nil, err
}
return result, nil
}
// Tune changes the pool size.
func (p *Pool) Tune(size int32) {
if size == 0 {
return
}
p.mu.Lock()
defer p.mu.Unlock()
p.SetLastTuneTs(time.Now())
old := p.capacity
p.capacity = size
p.concurrencyMetrics.Set(float64(size))
if old == size {
return
}
if old < size && p.running.Load() < size {
_, task := p.taskManager.Overclock()
if task != nil {
p.running.Add(1)
p.run(func() {
runTask(task)
})
}
return
}
if p.running.Load() > size {
p.taskManager.Downclock()
}
}
// Run runs a function in the pool.
func (p *Pool) Run(fn func()) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
_, run := p.checkAndAddRunning(1)
if !run {
return pool.ErrPoolOverload
}
p.run(fn)
return nil
}
// Cap returns the capacity of the pool.
func (p *Pool) Cap() int32 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.capacity
}
// Running returns the number of running goroutines.
func (p *Pool) Running() int32 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.running.Load()
}
func (p *Pool) run(fn func()) {
p.wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Error("recover panic", zap.Any("recover", r), zap.Stack("stack"))
}
p.wg.Done()
p.running.Add(-1)
}()
fn()
}()
}
// RunWithConcurrency runs a function in the pool with concurrency.
func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
conc, run := p.checkAndAddRunning(concurrency)
if !run {
return pool.ErrPoolOverload
}
exitCh := make(chan struct{}, 1)
meta := poolmanager.NewMeta(p.GenTaskID(), exitCh, fns, int32(concurrency))
p.taskManager.RegisterTask(meta)
for range conc {
p.run(func() {
runTask(meta)
})
}
return nil
}
// checkAndAddRunning is to check if a task can run. If can, add the running number.
func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) {
for {
if p.isStop.Load() {
return 0, false
}
p.mu.Lock()
value, run := p.checkAndAddRunningInternal(int32(concurrency))
if run {
p.mu.Unlock()
return value, run
}
if !p.options.Blocking {
p.mu.Unlock()
return 0, false
}
p.mu.Unlock()
time.Sleep(waitInterval)
}
}
func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bool) {
n := p.capacity - p.running.Load()
if n <= 0 {
return 0, false
}
// if concurrency is 1 , we must return a goroutine
// if concurrency is more than 1, we must return at least one goroutine.
result := min(n, concurrency)
p.running.Add(result)
return result, true
}
// ReleaseAndWait releases the pool and waits for all tasks to be completed.
func (p *Pool) ReleaseAndWait() {
p.isStop.Store(true)
// wait for all the task in the pending to exit
p.cond.L.Lock()
for p.waiting.Load() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
p.wg.Wait()
resourcemanager.InstanceResourceManager.Unregister(p.Name())
}
func runTask(task *poolmanager.Meta) {
taskCh := task.GetTaskCh()
exitCh := task.GetExitCh()
task.IncTask()
defer task.DecTask()
for {
select {
case f, ok := <-taskCh:
if !ok {
return
}
f()
case <-exitCh:
return
}
}
}
// GetOriginConcurrency return the concurrency of the pool at the init.
func (p *Pool) GetOriginConcurrency() int32 {
return p.originCapacity
}