132 lines
3.1 KiB
Go
132 lines
3.1 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package util
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
|
|
"github.com/ngaut/pools"
|
|
"github.com/pingcap/failpoint"
|
|
)
|
|
|
|
// SessionPool is a recyclable resource pool for the session.
|
|
type SessionPool interface {
|
|
Get() (pools.Resource, error)
|
|
Put(pools.Resource)
|
|
Close()
|
|
}
|
|
|
|
// DestroyableSessionPool is a session pool that can destroy the session resource.
|
|
// If the caller meets an error when using the session, it can destroy the session.
|
|
// See more by searching `StoreInternalSession`.
|
|
type DestroyableSessionPool interface {
|
|
SessionPool
|
|
Destroy(pools.Resource)
|
|
}
|
|
|
|
// resourceCallback is a helper function to be triggered after Get/Put call.
|
|
type resourceCallback func(pools.Resource)
|
|
|
|
type pool struct {
|
|
resources chan pools.Resource
|
|
factory pools.Factory
|
|
mu struct {
|
|
sync.RWMutex
|
|
closed bool
|
|
}
|
|
getCallback resourceCallback
|
|
putCallback resourceCallback
|
|
destroyCallback resourceCallback
|
|
}
|
|
|
|
// NewSessionPool creates a new session pool with the given capacity and factory function.
|
|
func NewSessionPool(capacity int, factory pools.Factory, getCallback, putCallback, destroyCallback resourceCallback) DestroyableSessionPool {
|
|
return &pool{
|
|
resources: make(chan pools.Resource, capacity),
|
|
factory: factory,
|
|
getCallback: getCallback,
|
|
putCallback: putCallback,
|
|
destroyCallback: destroyCallback,
|
|
}
|
|
}
|
|
|
|
// Get gets a session from the session pool.
|
|
func (p *pool) Get() (resource pools.Resource, err error) {
|
|
var ok bool
|
|
select {
|
|
case resource, ok = <-p.resources:
|
|
if !ok {
|
|
err = errors.New("session pool closed")
|
|
}
|
|
default:
|
|
resource, err = p.factory()
|
|
}
|
|
|
|
// Put the internal session to the map of Manager
|
|
failpoint.Inject("mockSessionPoolReturnError", func() {
|
|
err = errors.New("mockSessionPoolReturnError")
|
|
})
|
|
|
|
if err == nil && p.getCallback != nil {
|
|
p.getCallback(resource)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Put puts the session back to the pool.
|
|
func (p *pool) Put(resource pools.Resource) {
|
|
p.mu.RLock()
|
|
defer p.mu.RUnlock()
|
|
if p.putCallback != nil {
|
|
p.putCallback(resource)
|
|
}
|
|
if p.mu.closed {
|
|
resource.Close()
|
|
return
|
|
}
|
|
|
|
select {
|
|
case p.resources <- resource:
|
|
default:
|
|
resource.Close()
|
|
}
|
|
}
|
|
|
|
// Destroy destroys the session.
|
|
func (p *pool) Destroy(resource pools.Resource) {
|
|
if p.destroyCallback != nil {
|
|
p.destroyCallback(resource)
|
|
}
|
|
resource.Close()
|
|
}
|
|
|
|
// Close closes the pool to release all resources.
|
|
func (p *pool) Close() {
|
|
p.mu.Lock()
|
|
if p.mu.closed {
|
|
p.mu.Unlock()
|
|
return
|
|
}
|
|
p.mu.closed = true
|
|
close(p.resources)
|
|
p.mu.Unlock()
|
|
|
|
for r := range p.resources {
|
|
r.Close()
|
|
}
|
|
}
|