406 lines
9.1 KiB
Go
406 lines
9.1 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package membuf
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/rand"
|
|
rand2 "math/rand"
|
|
"runtime"
|
|
"slices"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type testAllocator struct {
|
|
allocs int
|
|
frees int
|
|
}
|
|
|
|
func (t *testAllocator) Alloc(n int) []byte {
|
|
t.allocs++
|
|
return make([]byte, n)
|
|
}
|
|
|
|
func (t *testAllocator) Free(_ []byte) {
|
|
t.frees++
|
|
}
|
|
|
|
func TestBufferPool(t *testing.T) {
|
|
allocator := &testAllocator{}
|
|
pool := NewPool(
|
|
WithBlockNum(2),
|
|
WithAllocator(allocator),
|
|
WithBlockSize(1024),
|
|
)
|
|
defer pool.Destroy()
|
|
|
|
bytesBuf := pool.NewBuffer()
|
|
bytesBuf.AllocBytes(256)
|
|
require.Equal(t, 1, allocator.allocs)
|
|
bytesBuf.AllocBytes(512)
|
|
require.Equal(t, 1, allocator.allocs)
|
|
bytesBuf.AllocBytes(257)
|
|
require.Equal(t, 2, allocator.allocs)
|
|
bytesBuf.AllocBytes(767)
|
|
require.Equal(t, 2, allocator.allocs)
|
|
|
|
largeBytes := bytesBuf.AllocBytes(1025)
|
|
require.Equal(t, 1025, len(largeBytes))
|
|
require.Equal(t, 2, allocator.allocs)
|
|
|
|
require.Equal(t, 0, allocator.frees)
|
|
bytesBuf.Destroy()
|
|
require.Equal(t, 0, allocator.frees)
|
|
|
|
bytesBuf = pool.NewBuffer()
|
|
for range 6 {
|
|
bytesBuf.AllocBytes(512)
|
|
}
|
|
bytesBuf.Destroy()
|
|
require.Equal(t, 3, allocator.allocs)
|
|
require.Equal(t, 1, allocator.frees)
|
|
}
|
|
|
|
func TestPoolMemLimit(t *testing.T) {
|
|
limiter := NewLimiter(2*1024*1024 + 2*smallObjOverheadBatch)
|
|
// only allow to allocate one block
|
|
pool := NewPool(
|
|
WithBlockSize(2*1024*1024),
|
|
WithPoolMemoryLimiter(limiter),
|
|
)
|
|
defer pool.Destroy()
|
|
buf := pool.NewBuffer()
|
|
buf.AllocBytes(1024 * 1024)
|
|
buf.AllocBytes(1024 * 1024)
|
|
|
|
buf2 := pool.NewBuffer()
|
|
done := make(chan struct{}, 1)
|
|
go func() {
|
|
buf2.AllocBytes(1024 * 1024)
|
|
buf2.Destroy()
|
|
done <- struct{}{}
|
|
}()
|
|
|
|
// sleep a while to make sure the goroutine is started
|
|
time.Sleep(50 * time.Millisecond)
|
|
require.Len(t, done, 0)
|
|
// reset will not release memory to pool
|
|
buf.Reset()
|
|
buf.AllocBytes(1024 * 1024)
|
|
buf.AllocBytes(1024 * 1024)
|
|
require.Len(t, done, 0)
|
|
// destroy will release memory to pool
|
|
buf.Destroy()
|
|
// wait buf2 to finish
|
|
require.Eventually(t, func() bool {
|
|
return len(done) > 0
|
|
}, time.Second, 10*time.Millisecond)
|
|
// after buf2 is finished, still can allocate memory from pool
|
|
buf.AllocBytes(2 * 1024 * 1024)
|
|
buf.Destroy()
|
|
}
|
|
|
|
func TestBufferIsolation(t *testing.T) {
|
|
pool := NewPool(WithBlockSize(1024))
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
b1 := bytesBuf.AllocBytes(16)
|
|
b2 := bytesBuf.AllocBytes(16)
|
|
require.Len(t, b1, cap(b1))
|
|
require.Len(t, b2, cap(b2))
|
|
|
|
_, err := rand.Read(b2)
|
|
require.NoError(t, err)
|
|
b3 := slices.Clone(b2)
|
|
b1 = append(b1, 0, 1, 2, 3)
|
|
require.Equal(t, b3, b2)
|
|
require.NotEqual(t, b2, b1)
|
|
}
|
|
|
|
func TestBufferMemLimit(t *testing.T) {
|
|
pool := NewPool(WithBlockSize(10))
|
|
defer pool.Destroy()
|
|
// the actual memory limit is 10 bytes.
|
|
bytesBuf := pool.NewBuffer(WithBufferMemoryLimit(5))
|
|
|
|
got, _ := bytesBuf.AllocBytesWithSliceLocation(9)
|
|
require.NotNil(t, got)
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(3)
|
|
require.Nil(t, got)
|
|
|
|
bytesBuf.Destroy()
|
|
// test the buffer is still usable after destroy.
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(3)
|
|
require.NotNil(t, got)
|
|
|
|
// exactly 2 block
|
|
bytesBuf = pool.NewBuffer(WithBufferMemoryLimit(20))
|
|
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(9)
|
|
require.NotNil(t, got)
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(9)
|
|
require.NotNil(t, got)
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(2)
|
|
require.Nil(t, got)
|
|
|
|
// after reset, can get same allocation again
|
|
bytesBuf.Reset()
|
|
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(9)
|
|
require.NotNil(t, got)
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(9)
|
|
require.NotNil(t, got)
|
|
got, _ = bytesBuf.AllocBytesWithSliceLocation(2)
|
|
require.Nil(t, got)
|
|
}
|
|
|
|
func TestGetAlignedSizeGetBlockCnt(t *testing.T) {
|
|
require.EqualValues(t, 1, getBlockCnt(10, 16))
|
|
require.EqualValues(t, 2, getBlockCnt(17, 16))
|
|
require.EqualValues(t, 16, GetAlignedSize(10, 16))
|
|
require.EqualValues(t, 32, GetAlignedSize(17, 16))
|
|
}
|
|
|
|
const dataNum = 100 * 1024 * 1024
|
|
|
|
func BenchmarkStoreSlice(b *testing.B) {
|
|
data := make([][]byte, dataNum)
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
data[j] = bytesBuf.AllocBytes(10)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkStoreLocation(b *testing.B) {
|
|
data := make([]SliceLocation, dataNum)
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
_, data[j] = bytesBuf.AllocBytesWithSliceLocation(10)
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
const sortDataNum = 1024 * 1024
|
|
|
|
func BenchmarkSortSlice(b *testing.B) {
|
|
data := make([][]byte, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
data[j] = bytesBuf.AllocBytes(10)
|
|
rnd.Read(data[j])
|
|
}
|
|
slices.SortFunc(data, func(a, b []byte) int {
|
|
return bytes.Compare(a, b)
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkSortLocation(b *testing.B) {
|
|
data := make([]SliceLocation, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
var buf []byte
|
|
buf, data[j] = bytesBuf.AllocBytesWithSliceLocation(10)
|
|
rnd.Read(buf)
|
|
}
|
|
slices.SortFunc(data, func(a, b SliceLocation) int {
|
|
return bytes.Compare(bytesBuf.GetSlice(&a), bytesBuf.GetSlice(&b))
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkSortSliceWithGC(b *testing.B) {
|
|
data := make([][]byte, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
data[j] = bytesBuf.AllocBytes(10)
|
|
rnd.Read(data[j])
|
|
}
|
|
runtime.GC()
|
|
slices.SortFunc(data, func(a, b []byte) int {
|
|
return bytes.Compare(a, b)
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkSortLocationWithGC(b *testing.B) {
|
|
data := make([]SliceLocation, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
var buf []byte
|
|
buf, data[j] = bytesBuf.AllocBytesWithSliceLocation(10)
|
|
rnd.Read(buf)
|
|
}
|
|
runtime.GC()
|
|
slices.SortFunc(data, func(a, b SliceLocation) int {
|
|
return bytes.Compare(bytesBuf.GetSlice(&a), bytesBuf.GetSlice(&b))
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkSortLocationWithEscape(b *testing.B) {
|
|
data := make([]SliceLocation, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
var buf []byte
|
|
buf, data[j] = bytesBuf.AllocBytesWithSliceLocation(10)
|
|
rnd.Read(buf)
|
|
}
|
|
|
|
var (
|
|
dupFound bool
|
|
dupLoc *SliceLocation
|
|
)
|
|
slices.SortFunc(data, func(a, b SliceLocation) int {
|
|
res := bytes.Compare(bytesBuf.GetSlice(&a), bytesBuf.GetSlice(&b))
|
|
if res == 0 && !dupFound {
|
|
dupFound = true
|
|
dupLoc = &a
|
|
}
|
|
return res
|
|
})
|
|
_ = dupLoc
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkSortLocationWithoutEscape(b *testing.B) {
|
|
data := make([]SliceLocation, sortDataNum)
|
|
// fixed seed for benchmark
|
|
rnd := rand2.New(rand2.NewSource(6716))
|
|
|
|
for b.Loop() {
|
|
func() {
|
|
pool := NewPool()
|
|
defer pool.Destroy()
|
|
bytesBuf := pool.NewBuffer()
|
|
defer bytesBuf.Destroy()
|
|
|
|
for j := range data {
|
|
var buf []byte
|
|
buf, data[j] = bytesBuf.AllocBytesWithSliceLocation(10)
|
|
rnd.Read(buf)
|
|
}
|
|
|
|
var (
|
|
dupFound bool
|
|
dupLoc SliceLocation
|
|
)
|
|
slices.SortFunc(data, func(a, b SliceLocation) int {
|
|
res := bytes.Compare(bytesBuf.GetSlice(&a), bytesBuf.GetSlice(&b))
|
|
if res == 0 && !dupFound {
|
|
dupFound = true
|
|
dupLoc = a
|
|
}
|
|
return res
|
|
})
|
|
_ = dupLoc
|
|
}()
|
|
}
|
|
}
|
|
|
|
func BenchmarkConcurrentAcquire(b *testing.B) {
|
|
for b.Loop() {
|
|
limiter := NewLimiter(512 * 1024 * 1024)
|
|
pool := NewPool(WithPoolMemoryLimiter(limiter), WithBlockSize(4*1024))
|
|
// start 1000 clients, each client will acquire 100B for 1000 times.
|
|
wg := sync.WaitGroup{}
|
|
clientNum := 1000
|
|
wg.Add(clientNum)
|
|
for range clientNum {
|
|
go func() {
|
|
defer wg.Done()
|
|
buf := pool.NewBuffer()
|
|
for range 1000 {
|
|
buf.AllocBytes(100)
|
|
}
|
|
buf.Destroy()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
pool.Destroy()
|
|
}
|
|
}
|