Files
tidb/pkg/lightning/membuf/buffer.go

332 lines
8.9 KiB
Go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package membuf
import "unsafe"
const (
defaultPoolSize = 1024
defaultBlockSize = 1 << 20 // 1M
)
// Allocator is the abstract interface for allocating and freeing memory.
type Allocator interface {
Alloc(n int) []byte
Free([]byte)
}
type stdAllocator struct{}
func (stdAllocator) Alloc(n int) []byte {
return make([]byte, n)
}
func (stdAllocator) Free(_ []byte) {}
// Pool is like `sync.Pool`, which manages memory for all bytes buffers. You can
// use Pool.NewBuffer to create a new buffer, and use Buffer.Destroy to release
// its memory to the pool. Pool can provide fixed size []byte blocks to Buffer.
//
// NOTE: we don't used a `sync.Pool` because when will sync.Pool release is depending on the
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
// can decrease the memory usage to 1/3 compare with sync.Pool.
type Pool struct {
allocator Allocator
blockSize int
blockCache chan []byte
limiter *Limiter
}
// Option configures a pool.
type Option func(p *Pool)
// WithBlockNum configures how many blocks cached by this pool.
func WithBlockNum(num int) Option {
return func(p *Pool) {
p.blockCache = make(chan []byte, num)
}
}
// WithBlockSize configures the size of each block.
func WithBlockSize(bytes int) Option {
return func(p *Pool) {
p.blockSize = bytes
}
}
// WithAllocator specifies the allocator used by pool to allocate and free memory.
func WithAllocator(allocator Allocator) Option {
return func(p *Pool) {
p.allocator = allocator
}
}
// WithPoolMemoryLimiter controls the maximum memory returned to buffer. Note
// that when call AllocBytes with size larger than blockSize, the memory is not
// controlled by this limiter.
func WithPoolMemoryLimiter(limiter *Limiter) Option {
return func(p *Pool) {
p.limiter = limiter
}
}
// NewPool creates a new pool.
func NewPool(opts ...Option) *Pool {
p := &Pool{
allocator: stdAllocator{},
blockSize: defaultBlockSize,
blockCache: make(chan []byte, defaultPoolSize),
}
for _, opt := range opts {
opt(p)
}
return p
}
func (p *Pool) acquire() []byte {
if p.limiter != nil {
p.limiter.Acquire(p.blockSize)
}
select {
case b := <-p.blockCache:
return b
default:
return p.allocator.Alloc(p.blockSize)
}
}
func (p *Pool) release(b []byte) {
select {
case p.blockCache <- b:
default:
p.allocator.Free(b)
}
if p.limiter != nil {
p.limiter.Release(p.blockSize)
}
}
// Destroy frees all buffers.
func (p *Pool) Destroy() {
close(p.blockCache)
for b := range p.blockCache {
p.allocator.Free(b)
}
}
// TotalSize is the total memory size of this Pool, not considering its Buffer.
func (p *Pool) TotalSize() int64 {
return int64(len(p.blockCache) * p.blockSize)
}
// Buffer represents a buffer that can allocate []byte from its memory.
type Buffer struct {
pool *Pool
blocks [][]byte
blockCntLimit int
curBlock []byte
curBlockIdx int
curIdx int
smallObjOverhead int
smallObjOverheadCache int
}
// BufferOption configures a buffer.
type BufferOption func(*Buffer)
// WithBufferMemoryLimit approximately limits the maximum memory size of this
// Buffer. Due to it use blocks to allocate memory, the actual memory size is
// blockSize*ceil(limit/blockSize).
func WithBufferMemoryLimit(limit uint64) BufferOption {
return func(b *Buffer) {
blockCntLimit := int(getBlockCnt(limit, uint64(b.pool.blockSize)))
b.blockCntLimit = blockCntLimit
b.blocks = make([][]byte, 0, blockCntLimit)
}
}
// GetAlignedSize returns the size after aligned by blockSize.
func GetAlignedSize(size, blockSize uint64) uint64 {
return getBlockCnt(size, blockSize) * blockSize
}
// ceil(limit/blockSize)
func getBlockCnt(size, blockSize uint64) uint64 {
return (size + blockSize - 1) / blockSize
}
// NewBuffer creates a new buffer in current pool. The buffer can gradually
// acquire memory from the pool and release all memory once it's not used.
func (p *Pool) NewBuffer(opts ...BufferOption) *Buffer {
b := &Buffer{
pool: p,
curBlockIdx: -1,
blockCntLimit: -1,
}
for _, opt := range opts {
opt(b)
}
if b.blocks == nil {
b.blocks = make([][]byte, 0, 128)
}
return b
}
// smallObjOverheadBatch is the batch size to acquire memory from limiter. 256KB
// can store 256KB/24B = 10922 []byte objects, or 256KB/12B = 21845 SliceLocation
// objects.
const smallObjOverheadBatch = 256 * 1024
// recordSmallObjOverhead records the memory cost of []byte or SliceLocation into
// pool's limiter. The caller will ensure the pool's limiter is not nil.
func (b *Buffer) recordSmallObjOverhead(n int) {
if n > b.smallObjOverheadCache {
b.pool.limiter.Acquire(smallObjOverheadBatch)
b.smallObjOverheadCache += smallObjOverheadBatch
b.smallObjOverhead += smallObjOverheadBatch
}
b.smallObjOverheadCache -= n
}
// releaseSmallObjOverhead releases the memory cost of []byte or SliceLocation
// that are acquired from this Buffer before to the pool's limiter. The caller
// will ensure the pool's limiter is not nil.
func (b *Buffer) releaseSmallObjOverhead() {
b.pool.limiter.Release(b.smallObjOverhead)
b.smallObjOverhead = 0
b.smallObjOverheadCache = 0
}
// Reset resets the buffer, the memory is still retained in this buffer. Caller
// must release the reference to the returned []byte or SliceLocation before
// calling Reset.
func (b *Buffer) Reset() {
if b.pool.limiter != nil {
b.releaseSmallObjOverhead()
}
if len(b.blocks) > 0 {
b.curBlock = b.blocks[0]
b.curBlockIdx = 0
b.curIdx = 0
}
}
// Destroy releases all buffers to the pool. Caller must release the reference to
// the returned []byte or SliceLocation before calling Destroy.
func (b *Buffer) Destroy() {
if b.pool.limiter != nil {
b.releaseSmallObjOverhead()
}
for _, buf := range b.blocks {
b.pool.release(buf)
}
b.blocks = nil
b.curBlock = nil
b.curBlockIdx = -1
b.curIdx = 0
}
// TotalSize represents the total memory size of this Buffer.
func (b *Buffer) TotalSize() int64 {
return int64(len(b.blocks) * b.pool.blockSize)
}
var sizeOfSlice = int(unsafe.Sizeof([]byte{}))
// AllocBytes allocates bytes with the given length.
func (b *Buffer) AllocBytes(n int) []byte {
if n > b.pool.blockSize {
return make([]byte, n)
}
bs, _ := b.allocBytesWithSliceLocation(n)
if bs != nil && b.pool.limiter != nil {
b.recordSmallObjOverhead(sizeOfSlice)
}
return bs
}
// SliceLocation is like a reflect.SliceHeader, but it's associated with a
// Buffer. The advantage is that it's smaller than a slice, and it doesn't
// contain a pointer thus more GC-friendly.
type SliceLocation struct {
bufIdx int32
offset int32
Length int32
}
var sizeOfSliceLocation = int(unsafe.Sizeof(SliceLocation{}))
func (b *Buffer) allocBytesWithSliceLocation(n int) ([]byte, SliceLocation) {
if n > b.pool.blockSize {
return nil, SliceLocation{}
}
if b.curIdx+n > len(b.curBlock) {
if b.blockCntLimit >= 0 && b.curBlockIdx+1 >= b.blockCntLimit {
return nil, SliceLocation{}
}
b.addBlock()
}
blockIdx := int32(b.curBlockIdx)
offset := int32(b.curIdx)
loc := SliceLocation{bufIdx: blockIdx, offset: offset, Length: int32(n)}
idx := b.curIdx
b.curIdx += n
return b.curBlock[idx:b.curIdx:b.curIdx], loc
}
// AllocBytesWithSliceLocation is like AllocBytes, but it must allocate the
// buffer in the pool rather from go's runtime. The expected usage is after
// writing data into returned slice **we do not store the slice**, but only the
// SliceLocation. Later we can use the SliceLocation to get the slice again. When
// we have a large number of slices in memory this can reduce memory occupation.
// nil returned slice means allocation failed.
func (b *Buffer) AllocBytesWithSliceLocation(n int) ([]byte, SliceLocation) {
bs, loc := b.allocBytesWithSliceLocation(n)
if bs != nil && b.pool.limiter != nil {
b.recordSmallObjOverhead(sizeOfSliceLocation)
}
return bs, loc
}
func (b *Buffer) addBlock() {
if b.curBlockIdx < len(b.blocks)-1 {
b.curBlockIdx++
b.curBlock = b.blocks[b.curBlockIdx]
} else {
block := b.pool.acquire()
b.blocks = append(b.blocks, block)
b.curBlock = block
b.curBlockIdx = len(b.blocks) - 1
}
b.curIdx = 0
}
// GetSlice returns the byte slice for the slice location.
func (b *Buffer) GetSlice(loc *SliceLocation) []byte {
return b.blocks[loc.bufIdx][loc.offset : loc.offset+loc.Length]
}
// AddBytes adds the bytes into this Buffer's managed memory and return it.
func (b *Buffer) AddBytes(bytes []byte) []byte {
buf := b.AllocBytes(len(bytes))
copy(buf, bytes)
return buf
}