Files
tidb/pkg/resourcemanager/pool/spool/spool_test.go
2025-05-07 13:37:02 +00:00

215 lines
5.9 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package spool
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/tidb/pkg/resourcemanager/pool"
"github.com/pingcap/tidb/pkg/resourcemanager/util"
"github.com/stretchr/testify/require"
)
func demoFunc() {
f(2)
}
func f(n int) {
if n == 0 {
return
}
var useStack [100]byte
_ = useStack[3]
f(n - 1)
}
func TestReleaseWhenRunningPool(t *testing.T) {
var wg sync.WaitGroup
p, err := NewPool("TestReleaseWhenRunningPool", 1, util.UNKNOWN)
require.NoError(t, err)
wg.Add(2)
go func() {
defer wg.Done()
for range 30 {
_ = p.Run(func() {
time.Sleep(100 * time.Microsecond)
})
}
}()
go func() {
defer wg.Done()
for i := 100; i < 130; i++ {
_ = p.Run(func() {
time.Sleep(100 * time.Microsecond)
})
}
}()
time.Sleep(100 * time.Microsecond)
p.ReleaseAndWait()
wg.Wait()
}
func TestPoolTuneScaleUpAndDown(t *testing.T) {
c := make(chan struct{})
p, _ := NewPool("TestPoolTuneScaleUp", 2, util.UNKNOWN, WithBlocking(true))
for range 2 {
_ = p.Run(func() {
<-c
})
}
require.Equal(t, int32(2), p.Running())
// test pool tune scale up one
p.Tune(3)
_ = p.Run(func() {
<-c
})
require.Equal(t, int32(3), p.Running())
// test pool tune scale up multiple
var wg sync.WaitGroup
for range 5 {
wg.Add(1)
go func() {
defer wg.Done()
_ = p.Run(func() {
<-c
})
}()
}
p.Tune(8)
wg.Wait()
require.Eventually(t, func() bool { return p.Running() == 8 }, 1*time.Second, 200*time.Millisecond)
// test pool tune scale down
p.Tune(2)
for range 6 {
c <- struct{}{}
}
require.Eventually(t, func() bool { return p.Running() == 2 }, 1*time.Second, 200*time.Millisecond)
for range 2 {
c <- struct{}{}
}
require.Eventually(t, func() bool { return p.Running() == 0 }, 1*time.Second, 200*time.Millisecond)
// test with RunWithConcurrency
var cnt atomic.Int32
workerFn := func() {
cnt.Add(1)
}
fnChan := make(chan func(), 10)
wg.Wait()
err := p.RunWithConcurrency(fnChan, 2)
require.NoError(t, err)
require.Equal(t, int32(2), p.Running())
for range 10 {
fnChan <- workerFn
}
require.Eventually(t, func() bool { return cnt.Load() == 10 }, 1*time.Second, 200*time.Millisecond)
require.Equal(t, int32(2), p.Running())
close(fnChan)
require.Eventually(t, func() bool { return p.Running() == 0 }, 1*time.Second, 200*time.Millisecond)
p.ReleaseAndWait()
}
func TestRunOverload(t *testing.T) {
var stop atomic.Bool
longRunningFunc := func() {
for {
if stop.Load() {
break
}
runtime.Gosched()
}
}
poolSize := 10
p, err := NewPool("TestMaxBlockingSubmit", int32(poolSize), util.UNKNOWN, WithBlocking(false))
require.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.ReleaseAndWait()
defer stop.Store(true)
for range poolSize - 1 {
require.NoError(t, p.Run(longRunningFunc), "submit when pool is not full shouldn't return error")
}
// p is full now.
require.NoError(t, p.Run(longRunningFunc), "submit when pool is not full shouldn't return error")
require.EqualError(t, p.Run(demoFunc), pool.ErrPoolOverload.Error(),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload")
}
func TestRunWithNotEnough(t *testing.T) {
var stop atomic.Bool
fnChan := make(chan func(), 10)
poolSize := 10
p, err := NewPool("TestRunWithNotEnough", int32(poolSize), util.UNKNOWN, WithBlocking(false))
require.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.ReleaseAndWait()
defer stop.Store(true)
require.NoError(t, p.RunWithConcurrency(fnChan, uint32(poolSize+100)), "submit when pool is not full shouldn't return error")
require.Equal(t, int32(10), p.Running())
require.Error(t, p.RunWithConcurrency(fnChan, 1))
require.Error(t, p.Run(func() {}))
close(fnChan)
time.Sleep(1 * time.Second)
require.Equal(t, int32(0), p.Running())
}
func TestRunWithNotEnough2(t *testing.T) {
fnChan := make(chan func(), 10)
var cnt atomic.Int32
fn := func() {
cnt.Add(1)
}
p, err := NewPool("TestRunWithNotEnough2", int32(1), util.UNKNOWN, WithBlocking(false))
require.NoErrorf(t, err, "create TimingPool failed: %v", err)
defer p.ReleaseAndWait()
require.NoError(t, p.RunWithConcurrency(fnChan, 2), "submit when pool is not full shouldn't return error")
require.Equal(t, int32(1), p.Running())
require.Error(t, p.RunWithConcurrency(fnChan, 1))
require.Error(t, p.Run(func() {}))
for range 100 {
fnChan <- fn
}
close(fnChan)
time.Sleep(100 * time.Microsecond)
require.Equal(t, int32(0), p.Running())
require.Equal(t, int32(100), cnt.Load())
}
func TestWithTaskManager(t *testing.T) {
p, err := NewPool("TestWithTaskManager", int32(1), util.UNKNOWN, WithBlocking(false))
require.NoError(t, err)
defer p.ReleaseAndWait()
fnChan := make(chan func(), 10)
require.NoError(t, p.RunWithConcurrency(fnChan, 2), "submit when pool is not full shouldn't return error")
time.Sleep(100 * time.Microsecond)
require.Equal(t, int32(1), p.Running())
// increase the concurrency
p.Tune(2)
time.Sleep(100 * time.Microsecond)
require.Equal(t, int32(2), p.Running())
p.Tune(3)
require.Eventually(t, func() bool { return p.Running() == 3 }, 1*time.Second, 200*time.Millisecond)
// decrease the concurrency
p.Tune(2)
require.Eventually(t, func() bool { return p.Running() == 2 }, 1*time.Second, 200*time.Millisecond)
p.Tune(1)
require.Eventually(t, func() bool { return p.Running() == 1 }, 1*time.Second, 200*time.Millisecond)
close(fnChan)
}