statistics: introduce an interface for StatsCache (#20091)
This commit is contained in:
1
go.sum
1
go.sum
@ -624,6 +624,7 @@ github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwp
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml v1.3.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
|
||||
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
|
||||
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 h1:JhzVVoYvbOACxoUmOs6V/G4D5nPVUW73rKvXxP4XUJc=
|
||||
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
|
||||
|
||||
@ -481,7 +481,7 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
h.statsCache.initStatsCache(tables, version)
|
||||
h.statsCache.InitStatsCache(tables, version)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -59,7 +59,7 @@ type Handle struct {
|
||||
|
||||
// It can be read by multiple readers at the same time without acquiring lock, but it can be
|
||||
// written only after acquiring the lock.
|
||||
statsCache *statsCache
|
||||
statsCache StatsCache
|
||||
|
||||
restrictedExec sqlexec.RestrictedSQLExecutor
|
||||
|
||||
@ -112,10 +112,14 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) (*Handle, error) {
|
||||
if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
|
||||
handle.restrictedExec = exec
|
||||
}
|
||||
handle.statsCache = newStatsCache(ctx.GetSessionVars().MemQuotaStatistics)
|
||||
var err error
|
||||
handle.statsCache, err = newStatsCacheWithMemCap(ctx.GetSessionVars().MemQuotaStatistics, defaultStatsCacheType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
handle.mu.ctx = ctx
|
||||
handle.mu.rateMap = make(errorRateDeltaMap)
|
||||
err := handle.RefreshVars()
|
||||
err = handle.RefreshVars()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -232,17 +236,14 @@ func buildPartitionID2TableID(is infoschema.InfoSchema) map[int64]int64 {
|
||||
|
||||
// GetMemConsumed returns the mem size of statscache consumed
|
||||
func (h *Handle) GetMemConsumed() (size int64) {
|
||||
h.statsCache.mu.Lock()
|
||||
size = h.statsCache.memTracker.BytesConsumed()
|
||||
h.statsCache.mu.Unlock()
|
||||
return
|
||||
return h.statsCache.BytesConsumed()
|
||||
}
|
||||
|
||||
// EraseTable4Test erase a table by ID and add new empty (with Meta) table.
|
||||
// ONLY used for test.
|
||||
func (h *Handle) EraseTable4Test(ID int64) {
|
||||
table, _ := h.statsCache.Lookup(ID)
|
||||
h.statsCache.Insert(table.CopyWithoutBucketsAndCMS())
|
||||
h.statsCache.Update([]*statistics.Table{table.CopyWithoutBucketsAndCMS()}, nil, h.statsCache.GetVersion())
|
||||
}
|
||||
|
||||
// GetAllTableStatsMemUsage4Test get all the mem usage with true table.
|
||||
@ -276,10 +277,7 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist
|
||||
// SetBytesLimit4Test sets the bytes limit for this tracker. "bytesLimit <= 0" means no limit.
|
||||
// Only used for test.
|
||||
func (h *Handle) SetBytesLimit4Test(bytesLimit int64) {
|
||||
h.statsCache.mu.Lock()
|
||||
h.statsCache.memTracker.SetBytesLimit(bytesLimit)
|
||||
h.statsCache.memCapacity = bytesLimit
|
||||
h.statsCache.mu.Unlock()
|
||||
h.statsCache.SetBytesLimit(bytesLimit)
|
||||
}
|
||||
|
||||
// CanRuntimePrune indicates whether tbl support runtime prune for table and first partition id.
|
||||
@ -964,7 +962,6 @@ func (h *Handle) ReloadExtendedStatistics() error {
|
||||
tables := make([]*statistics.Table, 0, len(allTables))
|
||||
for _, tbl := range allTables {
|
||||
t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), tbl.PhysicalID, true)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -134,7 +134,6 @@ func (s *testStatsSuite) TestStatsCacheMemTracker(c *C) {
|
||||
|
||||
testKit.MustExec("analyze table t")
|
||||
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
|
||||
|
||||
c.Assert(statsTbl.Pseudo, IsFalse)
|
||||
|
||||
// If the new schema drop a column, the table stats can still work.
|
||||
|
||||
@ -15,6 +15,7 @@ package handle
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/tidb/statistics"
|
||||
@ -22,8 +23,40 @@ import (
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
)
|
||||
|
||||
// statsCache caches table statistics.
|
||||
type statsCache struct {
|
||||
// StatsCache is an interface for the collection of statistics.
|
||||
type StatsCache interface {
|
||||
Lookup(id int64) (*statistics.Table, bool)
|
||||
Update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64)
|
||||
GetVersion() uint64
|
||||
InitStatsCache(tables map[int64]*statistics.Table, version uint64)
|
||||
GetAll() []*statistics.Table
|
||||
|
||||
// Interface below are used only for test.
|
||||
Clear()
|
||||
GetBytesLimit() int64
|
||||
SetBytesLimit(bytesLimit int64)
|
||||
BytesConsumed() int64
|
||||
}
|
||||
|
||||
type statsCacheType int8
|
||||
|
||||
const (
|
||||
simpleLRUCache statsCacheType = iota
|
||||
)
|
||||
|
||||
var defaultStatsCacheType = simpleLRUCache
|
||||
|
||||
// newStatsCacheWithMemCap returns a new stats cache with memory capacity.
|
||||
func newStatsCacheWithMemCap(memoryCapacity int64, tp statsCacheType) (StatsCache, error) {
|
||||
switch tp {
|
||||
case simpleLRUCache:
|
||||
return newSimpleLRUStatsCache(memoryCapacity), nil
|
||||
}
|
||||
return nil, errors.New("wrong statsCache type")
|
||||
}
|
||||
|
||||
// simpleLRUStatsCache uses the simpleLRUCache to store the cache of statistics.
|
||||
type simpleLRUStatsCache struct {
|
||||
mu sync.Mutex
|
||||
cache *kvcache.SimpleLRUCache
|
||||
memCapacity int64
|
||||
@ -39,47 +72,35 @@ func (key statsCacheKey) Hash() []byte {
|
||||
return buf
|
||||
}
|
||||
|
||||
// newStatsCache returns a new statsCache with capacity maxMemoryLimit.
|
||||
func newStatsCache(memoryLimit int64) *statsCache {
|
||||
// Since newStatsCache controls the memory usage by itself, set the capacity of
|
||||
func newSimpleLRUStatsCache(memoryCapacity int64) *simpleLRUStatsCache {
|
||||
// since stats cache controls the memory usage by itself, set the capacity of
|
||||
// the underlying LRUCache to max to close its memory control
|
||||
cache := kvcache.NewSimpleLRUCache(uint(memoryLimit), 0.1, 0)
|
||||
c := statsCache{
|
||||
cache := kvcache.NewSimpleLRUCache(uint(memoryCapacity), 0.1, 0)
|
||||
c := simpleLRUStatsCache{
|
||||
cache: cache,
|
||||
memCapacity: memoryLimit,
|
||||
memTracker: memory.NewTracker(memory.LabelForStatsCache, -1),
|
||||
memCapacity: memoryCapacity,
|
||||
memTracker: memory.NewTracker(memory.LabelForStatsCache, memoryCapacity),
|
||||
}
|
||||
return &c
|
||||
}
|
||||
|
||||
// Clear clears the statsCache.
|
||||
func (sc *statsCache) Clear() {
|
||||
// Since newStatsCache controls the memory usage by itself, set the capacity of
|
||||
// the underlying LRUCache to max to close its memory control
|
||||
// SetBytesLimit sets the bytes limit for this tracker.
|
||||
func (sc *simpleLRUStatsCache) SetBytesLimit(BytesLimit int64) {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
cache := kvcache.NewSimpleLRUCache(uint(sc.memCapacity), 0.1, 0)
|
||||
sc.memTracker.ReplaceBytesUsed(0)
|
||||
sc.cache = cache
|
||||
sc.version = 0
|
||||
sc.memTracker.SetBytesLimit(BytesLimit)
|
||||
sc.memCapacity = BytesLimit
|
||||
}
|
||||
|
||||
// GetAll get all the tables point.
|
||||
func (sc *statsCache) GetAll() []*statistics.Table {
|
||||
// BytesConsumed returns the consumed memory usage value in bytes.
|
||||
func (sc *simpleLRUStatsCache) BytesConsumed() int64 {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
values := sc.cache.GetAll()
|
||||
tables := make([]*statistics.Table, 0)
|
||||
for _, v := range values {
|
||||
if t, ok := v.(*statistics.Table); ok && t != nil {
|
||||
tables = append(tables, t)
|
||||
}
|
||||
}
|
||||
return tables
|
||||
return sc.memTracker.BytesConsumed()
|
||||
}
|
||||
|
||||
// lookupUnsafe get table with id without Lock.
|
||||
func (sc *statsCache) lookupUnsafe(id int64) (*statistics.Table, bool) {
|
||||
func (sc *simpleLRUStatsCache) lookupUnsafe(id int64) (*statistics.Table, bool) {
|
||||
var key = statsCacheKey(id)
|
||||
value, hit := sc.cache.Get(key)
|
||||
if !hit {
|
||||
@ -89,8 +110,17 @@ func (sc *statsCache) lookupUnsafe(id int64) (*statistics.Table, bool) {
|
||||
return table, true
|
||||
}
|
||||
|
||||
// Clear clears the cache
|
||||
func (sc *simpleLRUStatsCache) Clear() {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
sc.version = 0
|
||||
sc.cache.DeleteAll()
|
||||
sc.memTracker = memory.NewTracker(memory.LabelForStatsCache, sc.memCapacity)
|
||||
}
|
||||
|
||||
// Lookup get table with id.
|
||||
func (sc *statsCache) Lookup(id int64) (*statistics.Table, bool) {
|
||||
func (sc *simpleLRUStatsCache) Lookup(id int64) (*statistics.Table, bool) {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
return sc.lookupUnsafe(id)
|
||||
@ -99,7 +129,7 @@ func (sc *statsCache) Lookup(id int64) (*statistics.Table, bool) {
|
||||
// Insert inserts a new table to the statsCache.
|
||||
// If the memory consumption exceeds the capacity, remove the buckets and
|
||||
// CMSketch of the oldest cache and add metadata of it
|
||||
func (sc *statsCache) Insert(table *statistics.Table) {
|
||||
func (sc *simpleLRUStatsCache) Insert(table *statistics.Table) {
|
||||
if table == nil {
|
||||
return
|
||||
}
|
||||
@ -123,21 +153,34 @@ func (sc *statsCache) Insert(table *statistics.Table) {
|
||||
return
|
||||
}
|
||||
|
||||
// Erase erase a stateCache with physical id.
|
||||
func (sc *statsCache) Erase(deletedID int64) bool {
|
||||
// Erase removes a stateCache with physical id.
|
||||
func (sc *simpleLRUStatsCache) Erase(deletedID int64) bool {
|
||||
table, hit := sc.lookupUnsafe(deletedID)
|
||||
if !hit {
|
||||
return false
|
||||
}
|
||||
|
||||
key := statsCacheKey(deletedID)
|
||||
sc.cache.Delete(key)
|
||||
sc.memTracker.Consume(-table.MemoryUsage())
|
||||
return true
|
||||
}
|
||||
|
||||
// GetAll get all the tables point.
|
||||
func (sc *simpleLRUStatsCache) GetAll() []*statistics.Table {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
values := sc.cache.GetAll()
|
||||
tables := make([]*statistics.Table, 0, len(values))
|
||||
for _, v := range values {
|
||||
if t, ok := v.(*statistics.Table); ok && t != nil {
|
||||
tables = append(tables, t)
|
||||
}
|
||||
}
|
||||
return tables
|
||||
}
|
||||
|
||||
// Update updates the statistics table cache.
|
||||
func (sc *statsCache) Update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) {
|
||||
func (sc *simpleLRUStatsCache) Update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
if sc.version <= newVersion {
|
||||
@ -151,15 +194,22 @@ func (sc *statsCache) Update(tables []*statistics.Table, deletedIDs []int64, new
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *statsCache) GetVersion() uint64 {
|
||||
// GetBytesLimit get the limits of memory.
|
||||
func (sc *simpleLRUStatsCache) GetBytesLimit() int64 {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
return sc.memTracker.GetBytesLimit()
|
||||
}
|
||||
|
||||
func (sc *simpleLRUStatsCache) GetVersion() uint64 {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
return sc.version
|
||||
}
|
||||
|
||||
// initStatsCache should be invoked after the tables and their stats are initialized
|
||||
// using tables map and version to init statsCache
|
||||
func (sc *statsCache) initStatsCache(tables map[int64]*statistics.Table, version uint64) {
|
||||
// InitStatsCache should be called after the tables and their stats are initilazed
|
||||
// using tables map and version to init statscache
|
||||
func (sc *simpleLRUStatsCache) InitStatsCache(tables map[int64]*statistics.Table, version uint64) {
|
||||
sc.mu.Lock()
|
||||
defer sc.mu.Unlock()
|
||||
for _, tbl := range tables {
|
||||
|
||||
@ -45,8 +45,7 @@ func (s *testStatsSuite) TestStatsCacheMiniMemoryLimit(c *C) {
|
||||
// set new BytesLimit
|
||||
BytesLimit := int64(90000)
|
||||
|
||||
do.StatsHandle().SetBytesLimit4Test(BytesLimit)
|
||||
// create t2 and kick t1 of cache
|
||||
s.do.StatsHandle().SetBytesLimit4Test(BytesLimit)
|
||||
testKit.MustExec("create table t2 (c1 int, c2 int)")
|
||||
testKit.MustExec("insert into t2 values(1, 2)")
|
||||
do = s.do
|
||||
@ -102,7 +101,6 @@ func (s *testStatsSuite) TestLoadHistWithLimit(c *C) {
|
||||
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
|
||||
testKit.MustExec("analyze table t2")
|
||||
c.Assert(BytesLimit >= h.GetMemConsumed(), IsTrue)
|
||||
|
||||
}
|
||||
|
||||
func (s *testStatsSuite) TestLoadHistWithInvalidIndex(c *C) {
|
||||
@ -211,7 +209,6 @@ func (s *testStatsSuite) TestManyTableChange(c *C) {
|
||||
for _, v := range statsTblnew.Indices {
|
||||
c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -257,6 +254,5 @@ func (s *testStatsSuite) TestManyTableChangeWithQuery(c *C) {
|
||||
for _, v := range statsTblNew.Indices {
|
||||
c.Assert(v.IsInvalid(&stmtctx.StatementContext{}, false), IsFalse)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user