statistics: improve the memory usage for stats cache (#46586)
close pingcap/tidb#46585
This commit is contained in:
@ -19,6 +19,7 @@ go_library(
|
||||
"//util/memory",
|
||||
"@com_github_dgraph_io_ristretto//:ristretto",
|
||||
"@org_golang_x_exp//maps",
|
||||
"@org_golang_x_exp//rand",
|
||||
"@org_uber_go_zap//:zap",
|
||||
],
|
||||
)
|
||||
@ -30,7 +31,7 @@ go_test(
|
||||
embed = [":lfu"],
|
||||
flaky = True,
|
||||
race = "on",
|
||||
shard_count = 8,
|
||||
shard_count = 9,
|
||||
deps = [
|
||||
"//statistics",
|
||||
"//statistics/handle/cache/internal/testutil",
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/rand"
|
||||
)
|
||||
|
||||
// LFU is a LFU based on the ristretto.Cache
|
||||
@ -58,7 +59,7 @@ func NewLFU(totalMemCost int64) (*LFU, error) {
|
||||
bufferItems := int64(64)
|
||||
|
||||
cache, err := ristretto.NewCache(&ristretto.Config{
|
||||
NumCounters: mathutil.Max(mathutil.Min(totalMemCost/128*2, 2_000_000), 10), // assume the cost per table stats is 128
|
||||
NumCounters: mathutil.Max(mathutil.Min(totalMemCost/128, 1_000_000), 10), // assume the cost per table stats is 128
|
||||
MaxCost: totalMemCost,
|
||||
BufferItems: bufferItems,
|
||||
OnEvict: result.onEvict,
|
||||
@ -170,6 +171,16 @@ func (s *LFU) dropMemory(item *ristretto.Item) {
|
||||
s.addCost(after)
|
||||
}
|
||||
|
||||
func (s *LFU) triggerEvict() {
|
||||
// When the memory usage of the cache exceeds the maximum value, Many item need to evict. But
|
||||
// ristretto'c cache execute the evict operation when to write the cache. for we can evict as soon as possible,
|
||||
// we will write some fake item to the cache. fake item have a negative key, and the value is nil.
|
||||
if s.Cost() > s.cache.MaxCost() {
|
||||
//nolint: gosec
|
||||
s.cache.Set(-rand.Int(), nil, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LFU) onExit(val any) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@ -201,6 +212,7 @@ func (s *LFU) Copy() internal.StatsCacheInner {
|
||||
// SetCapacity implements statsCacheInner
|
||||
func (s *LFU) SetCapacity(maxCost int64) {
|
||||
s.cache.UpdateMaxCost(maxCost)
|
||||
s.triggerEvict()
|
||||
metrics.CapacityGauge.Set(float64(maxCost))
|
||||
}
|
||||
|
||||
|
||||
@ -260,7 +260,7 @@ func TestLFUReject(t *testing.T) {
|
||||
require.True(t, lfu.Put(2, t2))
|
||||
lfu.wait()
|
||||
time.Sleep(3 * time.Second)
|
||||
require.Equal(t, int64(12), lfu.Cost())
|
||||
require.Equal(t, int64(0), lfu.Cost())
|
||||
require.Len(t, lfu.Values(), 2)
|
||||
v, ok := lfu.Get(2, false)
|
||||
require.True(t, ok)
|
||||
@ -271,3 +271,35 @@ func TestLFUReject(t *testing.T) {
|
||||
require.Equal(t, statistics.AllEvicted, i.GetEvictedStatus())
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryControl(t *testing.T) {
|
||||
testMode = true
|
||||
capacity := int64(100000000000)
|
||||
lfu, err := NewLFU(capacity)
|
||||
require.NoError(t, err)
|
||||
t1 := testutil.NewMockStatisticsTable(2, 1, true, false, false)
|
||||
require.Equal(t, 2*mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalTrackingMemUsage())
|
||||
lfu.Put(1, t1)
|
||||
lfu.wait()
|
||||
|
||||
for i := 2; i <= 1000; i++ {
|
||||
t1 := testutil.NewMockStatisticsTable(2, 1, true, false, false)
|
||||
require.Equal(t, 2*mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalTrackingMemUsage())
|
||||
lfu.Put(int64(i), t1)
|
||||
}
|
||||
require.Equal(t, 1000*(2*mockCMSMemoryUsage+mockCMSMemoryUsage), lfu.Cost())
|
||||
|
||||
for i := 1000; i > 990; i-- {
|
||||
lfu.SetCapacity(int64(i-1) * (2*mockCMSMemoryUsage + mockCMSMemoryUsage))
|
||||
lfu.wait()
|
||||
require.Equal(t, int64(i-1)*(2*mockCMSMemoryUsage+mockCMSMemoryUsage), lfu.Cost())
|
||||
}
|
||||
for i := 990; i > 100; i = i - 100 {
|
||||
lfu.SetCapacity(int64(i-1) * (2*mockCMSMemoryUsage + mockCMSMemoryUsage))
|
||||
lfu.wait()
|
||||
require.Equal(t, int64(i-1)*(2*mockCMSMemoryUsage+mockCMSMemoryUsage), lfu.Cost())
|
||||
}
|
||||
lfu.SetCapacity(int64(10) * (2*mockCMSMemoryUsage + mockCMSMemoryUsage))
|
||||
lfu.wait()
|
||||
require.Equal(t, int64(10)*(2*mockCMSMemoryUsage+mockCMSMemoryUsage), lfu.Cost())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user