332 lines
8.9 KiB
Go
332 lines
8.9 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package membuf
|
|
|
|
import "unsafe"
|
|
|
|
const (
|
|
defaultPoolSize = 1024
|
|
defaultBlockSize = 1 << 20 // 1M
|
|
)
|
|
|
|
// Allocator is the abstract interface for allocating and freeing memory.
|
|
type Allocator interface {
|
|
Alloc(n int) []byte
|
|
Free([]byte)
|
|
}
|
|
|
|
type stdAllocator struct{}
|
|
|
|
func (stdAllocator) Alloc(n int) []byte {
|
|
return make([]byte, n)
|
|
}
|
|
|
|
func (stdAllocator) Free(_ []byte) {}
|
|
|
|
// Pool is like `sync.Pool`, which manages memory for all bytes buffers. You can
|
|
// use Pool.NewBuffer to create a new buffer, and use Buffer.Destroy to release
|
|
// its memory to the pool. Pool can provide fixed size []byte blocks to Buffer.
|
|
//
|
|
// NOTE: we don't used a `sync.Pool` because when will sync.Pool release is depending on the
|
|
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
|
|
// can decrease the memory usage to 1/3 compare with sync.Pool.
|
|
type Pool struct {
|
|
allocator Allocator
|
|
blockSize int
|
|
blockCache chan []byte
|
|
limiter *Limiter
|
|
}
|
|
|
|
// Option configures a pool.
|
|
type Option func(p *Pool)
|
|
|
|
// WithBlockNum configures how many blocks cached by this pool.
|
|
func WithBlockNum(num int) Option {
|
|
return func(p *Pool) {
|
|
p.blockCache = make(chan []byte, num)
|
|
}
|
|
}
|
|
|
|
// WithBlockSize configures the size of each block.
|
|
func WithBlockSize(bytes int) Option {
|
|
return func(p *Pool) {
|
|
p.blockSize = bytes
|
|
}
|
|
}
|
|
|
|
// WithAllocator specifies the allocator used by pool to allocate and free memory.
|
|
func WithAllocator(allocator Allocator) Option {
|
|
return func(p *Pool) {
|
|
p.allocator = allocator
|
|
}
|
|
}
|
|
|
|
// WithPoolMemoryLimiter controls the maximum memory returned to buffer. Note
|
|
// that when call AllocBytes with size larger than blockSize, the memory is not
|
|
// controlled by this limiter.
|
|
func WithPoolMemoryLimiter(limiter *Limiter) Option {
|
|
return func(p *Pool) {
|
|
p.limiter = limiter
|
|
}
|
|
}
|
|
|
|
// NewPool creates a new pool.
|
|
func NewPool(opts ...Option) *Pool {
|
|
p := &Pool{
|
|
allocator: stdAllocator{},
|
|
blockSize: defaultBlockSize,
|
|
blockCache: make(chan []byte, defaultPoolSize),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(p)
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (p *Pool) acquire() []byte {
|
|
if p.limiter != nil {
|
|
p.limiter.Acquire(p.blockSize)
|
|
}
|
|
select {
|
|
case b := <-p.blockCache:
|
|
return b
|
|
default:
|
|
return p.allocator.Alloc(p.blockSize)
|
|
}
|
|
}
|
|
|
|
func (p *Pool) release(b []byte) {
|
|
select {
|
|
case p.blockCache <- b:
|
|
default:
|
|
p.allocator.Free(b)
|
|
}
|
|
if p.limiter != nil {
|
|
p.limiter.Release(p.blockSize)
|
|
}
|
|
}
|
|
|
|
// Destroy frees all buffers.
|
|
func (p *Pool) Destroy() {
|
|
close(p.blockCache)
|
|
for b := range p.blockCache {
|
|
p.allocator.Free(b)
|
|
}
|
|
}
|
|
|
|
// TotalSize is the total memory size of this Pool, not considering its Buffer.
|
|
func (p *Pool) TotalSize() int64 {
|
|
return int64(len(p.blockCache) * p.blockSize)
|
|
}
|
|
|
|
// Buffer represents a buffer that can allocate []byte from its memory.
|
|
type Buffer struct {
|
|
pool *Pool
|
|
blocks [][]byte
|
|
blockCntLimit int
|
|
curBlock []byte
|
|
curBlockIdx int
|
|
curIdx int
|
|
|
|
smallObjOverhead int
|
|
smallObjOverheadCache int
|
|
}
|
|
|
|
// BufferOption configures a buffer.
|
|
type BufferOption func(*Buffer)
|
|
|
|
// WithBufferMemoryLimit approximately limits the maximum memory size of this
|
|
// Buffer. Due to it use blocks to allocate memory, the actual memory size is
|
|
// blockSize*ceil(limit/blockSize).
|
|
func WithBufferMemoryLimit(limit uint64) BufferOption {
|
|
return func(b *Buffer) {
|
|
blockCntLimit := int(getBlockCnt(limit, uint64(b.pool.blockSize)))
|
|
b.blockCntLimit = blockCntLimit
|
|
b.blocks = make([][]byte, 0, blockCntLimit)
|
|
}
|
|
}
|
|
|
|
// GetAlignedSize returns the size after aligned by blockSize.
|
|
func GetAlignedSize(size, blockSize uint64) uint64 {
|
|
return getBlockCnt(size, blockSize) * blockSize
|
|
}
|
|
|
|
// ceil(limit/blockSize)
|
|
func getBlockCnt(size, blockSize uint64) uint64 {
|
|
return (size + blockSize - 1) / blockSize
|
|
}
|
|
|
|
// NewBuffer creates a new buffer in current pool. The buffer can gradually
|
|
// acquire memory from the pool and release all memory once it's not used.
|
|
func (p *Pool) NewBuffer(opts ...BufferOption) *Buffer {
|
|
b := &Buffer{
|
|
pool: p,
|
|
curBlockIdx: -1,
|
|
blockCntLimit: -1,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(b)
|
|
}
|
|
if b.blocks == nil {
|
|
b.blocks = make([][]byte, 0, 128)
|
|
}
|
|
return b
|
|
}
|
|
|
|
// smallObjOverheadBatch is the batch size to acquire memory from limiter. 256KB
|
|
// can store 256KB/24B = 10922 []byte objects, or 256KB/12B = 21845 SliceLocation
|
|
// objects.
|
|
const smallObjOverheadBatch = 256 * 1024
|
|
|
|
// recordSmallObjOverhead records the memory cost of []byte or SliceLocation into
|
|
// pool's limiter. The caller will ensure the pool's limiter is not nil.
|
|
func (b *Buffer) recordSmallObjOverhead(n int) {
|
|
if n > b.smallObjOverheadCache {
|
|
b.pool.limiter.Acquire(smallObjOverheadBatch)
|
|
b.smallObjOverheadCache += smallObjOverheadBatch
|
|
b.smallObjOverhead += smallObjOverheadBatch
|
|
}
|
|
b.smallObjOverheadCache -= n
|
|
}
|
|
|
|
// releaseSmallObjOverhead releases the memory cost of []byte or SliceLocation
|
|
// that are acquired from this Buffer before to the pool's limiter. The caller
|
|
// will ensure the pool's limiter is not nil.
|
|
func (b *Buffer) releaseSmallObjOverhead() {
|
|
b.pool.limiter.Release(b.smallObjOverhead)
|
|
b.smallObjOverhead = 0
|
|
b.smallObjOverheadCache = 0
|
|
}
|
|
|
|
// Reset resets the buffer, the memory is still retained in this buffer. Caller
|
|
// must release the reference to the returned []byte or SliceLocation before
|
|
// calling Reset.
|
|
func (b *Buffer) Reset() {
|
|
if b.pool.limiter != nil {
|
|
b.releaseSmallObjOverhead()
|
|
}
|
|
if len(b.blocks) > 0 {
|
|
b.curBlock = b.blocks[0]
|
|
b.curBlockIdx = 0
|
|
b.curIdx = 0
|
|
}
|
|
}
|
|
|
|
// Destroy releases all buffers to the pool. Caller must release the reference to
|
|
// the returned []byte or SliceLocation before calling Destroy.
|
|
func (b *Buffer) Destroy() {
|
|
if b.pool.limiter != nil {
|
|
b.releaseSmallObjOverhead()
|
|
}
|
|
for _, buf := range b.blocks {
|
|
b.pool.release(buf)
|
|
}
|
|
b.blocks = nil
|
|
b.curBlock = nil
|
|
b.curBlockIdx = -1
|
|
b.curIdx = 0
|
|
}
|
|
|
|
// TotalSize represents the total memory size of this Buffer.
|
|
func (b *Buffer) TotalSize() int64 {
|
|
return int64(len(b.blocks) * b.pool.blockSize)
|
|
}
|
|
|
|
var sizeOfSlice = int(unsafe.Sizeof([]byte{}))
|
|
|
|
// AllocBytes allocates bytes with the given length.
|
|
func (b *Buffer) AllocBytes(n int) []byte {
|
|
if n > b.pool.blockSize {
|
|
return make([]byte, n)
|
|
}
|
|
|
|
bs, _ := b.allocBytesWithSliceLocation(n)
|
|
if bs != nil && b.pool.limiter != nil {
|
|
b.recordSmallObjOverhead(sizeOfSlice)
|
|
}
|
|
return bs
|
|
}
|
|
|
|
// SliceLocation is like a reflect.SliceHeader, but it's associated with a
|
|
// Buffer. The advantage is that it's smaller than a slice, and it doesn't
|
|
// contain a pointer thus more GC-friendly.
|
|
type SliceLocation struct {
|
|
bufIdx int32
|
|
offset int32
|
|
Length int32
|
|
}
|
|
|
|
var sizeOfSliceLocation = int(unsafe.Sizeof(SliceLocation{}))
|
|
|
|
func (b *Buffer) allocBytesWithSliceLocation(n int) ([]byte, SliceLocation) {
|
|
if n > b.pool.blockSize {
|
|
return nil, SliceLocation{}
|
|
}
|
|
|
|
if b.curIdx+n > len(b.curBlock) {
|
|
if b.blockCntLimit >= 0 && b.curBlockIdx+1 >= b.blockCntLimit {
|
|
return nil, SliceLocation{}
|
|
}
|
|
b.addBlock()
|
|
}
|
|
blockIdx := int32(b.curBlockIdx)
|
|
offset := int32(b.curIdx)
|
|
loc := SliceLocation{bufIdx: blockIdx, offset: offset, Length: int32(n)}
|
|
|
|
idx := b.curIdx
|
|
b.curIdx += n
|
|
return b.curBlock[idx:b.curIdx:b.curIdx], loc
|
|
}
|
|
|
|
// AllocBytesWithSliceLocation is like AllocBytes, but it must allocate the
|
|
// buffer in the pool rather from go's runtime. The expected usage is after
|
|
// writing data into returned slice **we do not store the slice**, but only the
|
|
// SliceLocation. Later we can use the SliceLocation to get the slice again. When
|
|
// we have a large number of slices in memory this can reduce memory occupation.
|
|
// nil returned slice means allocation failed.
|
|
func (b *Buffer) AllocBytesWithSliceLocation(n int) ([]byte, SliceLocation) {
|
|
bs, loc := b.allocBytesWithSliceLocation(n)
|
|
if bs != nil && b.pool.limiter != nil {
|
|
b.recordSmallObjOverhead(sizeOfSliceLocation)
|
|
}
|
|
return bs, loc
|
|
}
|
|
|
|
func (b *Buffer) addBlock() {
|
|
if b.curBlockIdx < len(b.blocks)-1 {
|
|
b.curBlockIdx++
|
|
b.curBlock = b.blocks[b.curBlockIdx]
|
|
} else {
|
|
block := b.pool.acquire()
|
|
b.blocks = append(b.blocks, block)
|
|
b.curBlock = block
|
|
b.curBlockIdx = len(b.blocks) - 1
|
|
}
|
|
|
|
b.curIdx = 0
|
|
}
|
|
|
|
// GetSlice returns the byte slice for the slice location.
|
|
func (b *Buffer) GetSlice(loc *SliceLocation) []byte {
|
|
return b.blocks[loc.bufIdx][loc.offset : loc.offset+loc.Length]
|
|
}
|
|
|
|
// AddBytes adds the bytes into this Buffer's managed memory and return it.
|
|
func (b *Buffer) AddBytes(bytes []byte) []byte {
|
|
buf := b.AllocBytes(len(bytes))
|
|
copy(buf, bytes)
|
|
return buf
|
|
}
|