diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index 35ac6afff9..aa2af6f949 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -80,3 +80,8 @@ func (r *ResourceManager) Register(pool util.GorotinuePool, name string, compone func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error { return r.poolMap.Add(name, pool) } + +// Unregister is to unregister pool into resource manager. +func (r *ResourceManager) Unregister(name string) { + r.poolMap.Del(name) +} diff --git a/resourcemanager/util/shard_pool_map.go b/resourcemanager/util/shard_pool_map.go index 8819c56ed3..371365af03 100644 --- a/resourcemanager/util/shard_pool_map.go +++ b/resourcemanager/util/shard_pool_map.go @@ -45,6 +45,11 @@ func (s *ShardPoolMap) Add(key string, pool *PoolContainer) error { return s.pools[hash(key)].Add(key, pool) } +// Del deletes a pool to the map. +func (s *ShardPoolMap) Del(key string) { + s.pools[hash(key)].Del(key) +} + // Iter iterates the map func (s *ShardPoolMap) Iter(fn func(pool *PoolContainer)) { for i := 0; i < shard; i++ { @@ -71,6 +76,12 @@ func (p *poolMap) Add(key string, pool *PoolContainer) error { return nil } +func (p *poolMap) Del(key string) { + p.mu.Lock() + defer p.mu.Unlock() + delete(p.poolMap, key) +} + func (p *poolMap) Iter(fn func(pool *PoolContainer)) { p.mu.RLock() defer p.mu.RUnlock() diff --git a/resourcemanager/util/shard_pool_map_test.go b/resourcemanager/util/shard_pool_map_test.go index 34e0a11ca5..bb09e2fbd8 100644 --- a/resourcemanager/util/shard_pool_map_test.go +++ b/resourcemanager/util/shard_pool_map_test.go @@ -35,4 +35,16 @@ func TestShardPoolMap(t *testing.T) { cnt.Add(1) }) require.Equal(t, rc, int(cnt.Load())) + + for i := 0; i < rc; i++ { + id := strconv.FormatInt(int64(i), 10) + pm.Del(id) + } + cnt.Store(0) + pm.Iter(func(pool *PoolContainer) { + cnt.Add(1) + }) + require.Equal(t, 0, int(cnt.Load())) + id := strconv.FormatInt(int64(0), 10) + pm.Del(id) } diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 0f81d86448..b8cecb289c 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -223,6 +223,7 @@ func (p *Pool[T, U, C, CT, TF]) ReleaseAndWait() { close(p.stopCh) p.release() + defer resourcemanager.GlobalResourceManager.Unregister(p.Name()) for { // Wait for all workers to exit and all task to be completed. if p.Running() == 0 && p.heartbeatDone.Load() && p.waitingTask.Load() == 0 { diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index bc9a197815..cef958e52a 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -69,6 +69,11 @@ func TestPool(t *testing.T) { require.Equal(t, uint32(10), count.Load()) // close pool pool.ReleaseAndWait() + + // test renew is normal + pool, err = NewSPMCPool[int, int, ConstArgs, any, pooltask.NilContext]("TestPool", 10, rmutil.UNKNOWN) + require.NoError(t, err) + pool.ReleaseAndWait() } func TestPoolWithEnoughCapacity(t *testing.T) {