resourcemanager, util: add resourcemanager.Unregister when closing spmcpool (#40547)
close pingcap/tidb#40546
This commit is contained in:
@ -80,3 +80,8 @@ func (r *ResourceManager) Register(pool util.GorotinuePool, name string, compone
|
||||
func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error {
|
||||
return r.poolMap.Add(name, pool)
|
||||
}
|
||||
|
||||
// Unregister is to unregister pool into resource manager.
|
||||
func (r *ResourceManager) Unregister(name string) {
|
||||
r.poolMap.Del(name)
|
||||
}
|
||||
|
||||
@ -45,6 +45,11 @@ func (s *ShardPoolMap) Add(key string, pool *PoolContainer) error {
|
||||
return s.pools[hash(key)].Add(key, pool)
|
||||
}
|
||||
|
||||
// Del deletes a pool to the map.
|
||||
func (s *ShardPoolMap) Del(key string) {
|
||||
s.pools[hash(key)].Del(key)
|
||||
}
|
||||
|
||||
// Iter iterates the map
|
||||
func (s *ShardPoolMap) Iter(fn func(pool *PoolContainer)) {
|
||||
for i := 0; i < shard; i++ {
|
||||
@ -71,6 +76,12 @@ func (p *poolMap) Add(key string, pool *PoolContainer) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *poolMap) Del(key string) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
delete(p.poolMap, key)
|
||||
}
|
||||
|
||||
func (p *poolMap) Iter(fn func(pool *PoolContainer)) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
@ -35,4 +35,16 @@ func TestShardPoolMap(t *testing.T) {
|
||||
cnt.Add(1)
|
||||
})
|
||||
require.Equal(t, rc, int(cnt.Load()))
|
||||
|
||||
for i := 0; i < rc; i++ {
|
||||
id := strconv.FormatInt(int64(i), 10)
|
||||
pm.Del(id)
|
||||
}
|
||||
cnt.Store(0)
|
||||
pm.Iter(func(pool *PoolContainer) {
|
||||
cnt.Add(1)
|
||||
})
|
||||
require.Equal(t, 0, int(cnt.Load()))
|
||||
id := strconv.FormatInt(int64(0), 10)
|
||||
pm.Del(id)
|
||||
}
|
||||
|
||||
@ -223,6 +223,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() {
|
||||
|
||||
close(p.stopCh)
|
||||
p.release()
|
||||
defer resourcemanager.GlobalResourceManager.Unregister(p.Name())
|
||||
for {
|
||||
// Wait for all workers to exit and all task to be completed.
|
||||
if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 {
|
||||
|
||||
@ -69,6 +69,11 @@ func TestPool(t *testing.T) {
|
||||
require.Equal(t, uint32(10), count.Load())
|
||||
// close pool
|
||||
pool.ReleaseAndWait()
|
||||
|
||||
// test renew is normal
|
||||
pool, err = NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10, rmutil.UNKNOWN)
|
||||
require.NoError(t, err)
|
||||
pool.ReleaseAndWait()
|
||||
}
|
||||
|
||||
func TestPoolWithEnoughCapacity(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user