Files
tidb/pkg/statistics/handle/cache/statscache.go

406 lines
13 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a Copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cache
import (
"context"
"slices"
"strconv"
"sync/atomic"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
tidbmetrics "github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache/metrics"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)
// LeaseOffset represents the time offset for the stats cache to load statistics from the store.
// This value is crucial to ensure that the stats are retrieved at the correct interval.
// See more at where it is used.
const LeaseOffset = 5
// StatsCacheImpl implements util.StatsCache.
type StatsCacheImpl struct {
atomic.Pointer[StatsCache]
statsHandle types.StatsHandle
}
// NewStatsCacheImpl creates a new StatsCache.
func NewStatsCacheImpl(statsHandle types.StatsHandle) (types.StatsCache, error) {
newCache, err := NewStatsCache()
if err != nil {
return nil, err
}
result := &StatsCacheImpl{
statsHandle: statsHandle,
}
result.Store(newCache)
return result, nil
}
// NewStatsCacheImplForTest creates a new StatsCache for test.
func NewStatsCacheImplForTest() (types.StatsCache, error) {
return NewStatsCacheImpl(nil)
}
// cacheOfBatchUpdate is a cache for batch update the stats cache.
// We should not insert a item based on a item which we get from the cache long time ago.
// It may cause the cache to be inconsistent.
// The item should be quickly modified and inserted back to the cache.
type cacheOfBatchUpdate struct {
op func(toUpdate []*statistics.Table, toDelete []int64)
toUpdate []*statistics.Table
toDelete []int64
batchSize int
}
const batchSizeOfUpdateBatch = 10
func (t *cacheOfBatchUpdate) internalFlush() {
t.op(t.toUpdate, t.toDelete)
t.toUpdate = t.toUpdate[:0]
t.toDelete = t.toDelete[:0]
}
func (t *cacheOfBatchUpdate) addToUpdate(table *statistics.Table) {
if len(t.toUpdate) == t.batchSize {
t.internalFlush()
}
t.toUpdate = append(t.toUpdate, table)
}
func (t *cacheOfBatchUpdate) addToDelete(tableID int64) {
if len(t.toDelete) == t.batchSize {
t.internalFlush()
}
t.toDelete = append(t.toDelete, tableID)
}
func (t *cacheOfBatchUpdate) flush() {
if len(t.toUpdate) > 0 || len(t.toDelete) > 0 {
t.internalFlush()
}
}
func newCacheOfBatchUpdate(batchSize int, op func(toUpdate []*statistics.Table, toDelete []int64)) cacheOfBatchUpdate {
return cacheOfBatchUpdate{
op: op,
toUpdate: make([]*statistics.Table, 0, batchSize),
toDelete: make([]int64, 0, batchSize),
batchSize: batchSize,
}
}
// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error {
onlyForAnalyzedTables := len(tableAndPartitionIDs) > 0
start := time.Now()
defer func() {
dur := time.Since(start)
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())
}()
lastVersion := s.GetNextCheckVersionWithOffset()
var (
skipMoveForwardStatsCache bool
rows []chunk.Row
err error
)
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
query := "SELECT version, table_id, modify_count, count, snapshot, last_stats_histograms_version from mysql.stats_meta where version > %? "
args := []any{lastVersion}
if onlyForAnalyzedTables {
// When updating specific tables, we skip incrementing the max stats version to avoid missing
// delta updates for other tables. The max version only advances when doing a full update.
skipMoveForwardStatsCache = true
// Sort and deduplicate the table IDs to remove duplicates
slices.Sort(tableAndPartitionIDs)
tableAndPartitionIDs = slices.Compact(tableAndPartitionIDs)
// Convert table IDs to strings since the SQL executor only accepts string arrays for IN clauses
tableStringIDs := make([]string, 0, len(tableAndPartitionIDs))
for _, tableID := range tableAndPartitionIDs {
tableStringIDs = append(tableStringIDs, strconv.FormatInt(tableID, 10))
}
query += "and table_id in (%?) "
args = append(args, tableStringIDs)
}
query += "order by version"
rows, _, err = util.ExecRows(sctx, query, args...)
return err
}); err != nil {
return errors.Trace(err)
}
tblToUpdateOrDelete := newCacheOfBatchUpdate(batchSizeOfUpdateBatch, func(toUpdate []*statistics.Table, toDelete []int64) {
s.UpdateStatsCache(types.CacheUpdate{
Updated: toUpdate,
Deleted: toDelete,
Options: types.UpdateOptions{
SkipMoveForward: skipMoveForwardStatsCache,
},
})
})
for _, row := range rows {
version := row.GetUint64(0)
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
snapshot := row.GetUint64(4)
var latestHistUpdateVersion uint64
if !row.IsNull(5) {
latestHistUpdateVersion = row.GetUint64(5)
}
// Detect the context cancel signal, since it may take a long time for the loop.
// TODO: add context to TableInfoByID and remove this code block?
if ctx.Err() != nil {
return ctx.Err()
}
table, ok := s.statsHandle.TableInfoByID(is, physicalID)
if !ok {
statslogutil.StatsLogger().Debug(
"unknown physical ID in stats meta table, maybe it has been dropped",
zap.Int64("ID", physicalID),
)
tblToUpdateOrDelete.addToDelete(physicalID)
continue
}
tableInfo := table.Meta()
// If the table is not updated, we can skip it.
oldTbl, ok := s.Get(physicalID)
if ok && oldTbl.Version >= version &&
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
var tbl *statistics.Table
needLoadColAndIdxStats := true
// If the column/index stats has not been updated, we can reuse the old table stats.
// Only need to update the count and modify count.
if ok && latestHistUpdateVersion > 0 && oldTbl.LastStatsHistVersion >= latestHistUpdateVersion {
tbl = oldTbl.CopyAs(statistics.MetaOnly)
// count and modify count is updated in finalProcess
needLoadColAndIdxStats = false
}
if needLoadColAndIdxStats {
tbl, err = s.statsHandle.TableStatsFromStorage(
tableInfo,
physicalID,
false,
0,
)
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
if err != nil {
statslogutil.StatsLogger().Warn(
"error occurred when read table stats",
zap.String("table", tableInfo.Name.O),
zap.Error(err),
)
continue
}
if tbl == nil {
tblToUpdateOrDelete.addToDelete(physicalID)
continue
}
}
tbl.Version = version
tbl.LastStatsHistVersion = latestHistUpdateVersion
tbl.RealtimeCount = count
tbl.ModifyCount = modifyCount
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
// It only occurs in the following situations:
// 1. The table has already been analyzed,
// but because the predicate columns feature is turned on, and it doesn't have any columns or indexes analyzed,
// it only analyzes _row_id and refreshes stats_meta, in which case the snapshot is not zero.
// 2. LastAnalyzeVersion is 0 because it has never been loaded.
// In this case, we can initialize LastAnalyzeVersion to the snapshot,
// otherwise auto-analyze will assume that the table has never been analyzed and try to analyze it again.
if tbl.LastAnalyzeVersion == 0 && snapshot != 0 {
tbl.LastAnalyzeVersion = snapshot
}
tblToUpdateOrDelete.addToUpdate(tbl)
}
tblToUpdateOrDelete.flush()
return nil
}
// GetNextCheckVersionWithOffset gets the last version with offset.
func (s *StatsCacheImpl) GetNextCheckVersionWithOffset() uint64 {
// Get the greatest version of the stats meta table.
lastVersion := s.MaxTableStatsVersion()
// We need this because for two tables, the smaller version may write later than the one with larger version.
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than five lease.
offset := util.DurationToTS(LeaseOffset * s.statsHandle.Lease())
if lastVersion >= offset {
lastVersion = lastVersion - offset
} else {
lastVersion = 0
}
return lastVersion
}
// Replace replaces this cache.
func (s *StatsCacheImpl) Replace(cache types.StatsCache) {
x := cache.(*StatsCacheImpl)
s.replace(x.Load())
}
// replace replaces the cache with the new cache.
func (s *StatsCacheImpl) replace(newCache *StatsCache) {
old := s.Swap(newCache)
if old != nil {
old.Close()
}
metrics.CostGauge.Set(float64(newCache.Cost()))
}
// UpdateStatsCache updates the cache with the new cache.
func (s *StatsCacheImpl) UpdateStatsCache(cacheUpdate types.CacheUpdate) {
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
s.Load().Update(cacheUpdate.Updated, cacheUpdate.Deleted, cacheUpdate.Options.SkipMoveForward)
} else {
// TODO: remove this branch because we will always enable quota.
newCache := s.Load().CopyAndUpdate(cacheUpdate.Updated, cacheUpdate.Deleted)
s.replace(newCache)
}
}
// Close closes this cache.
func (s *StatsCacheImpl) Close() {
s.Load().Close()
}
// Clear clears this cache.
// Create a empty cache and replace the old one.
func (s *StatsCacheImpl) Clear() {
cache, err := NewStatsCache()
if err != nil {
statslogutil.StatsLogger().Warn("create stats cache failed", zap.Error(err))
return
}
s.replace(cache)
}
// MemConsumed returns its memory usage.
func (s *StatsCacheImpl) MemConsumed() (size int64) {
return s.Load().Cost()
}
// Get returns the specified table's stats.
func (s *StatsCacheImpl) Get(tableID int64) (*statistics.Table, bool) {
failpoint.Inject("StatsCacheGetNil", func() {
failpoint.Return(nil, false)
})
return s.Load().Get(tableID)
}
// Put puts this table stats into the cache.
func (s *StatsCacheImpl) Put(id int64, t *statistics.Table) {
s.Load().put(id, t)
}
// TriggerEvict triggers the cache to evict some items.
func (s *StatsCacheImpl) TriggerEvict() {
s.Load().TriggerEvict()
}
// MaxTableStatsVersion returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
func (s *StatsCacheImpl) MaxTableStatsVersion() uint64 {
return s.Load().Version()
}
// Values returns all values in this cache.
func (s *StatsCacheImpl) Values() []*statistics.Table {
return s.Load().Values()
}
// Len returns the length of this cache.
func (s *StatsCacheImpl) Len() int {
return s.Load().Len()
}
// SetStatsCacheCapacity sets the cache's capacity.
func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) {
s.Load().SetCapacity(c)
}
// UpdateStatsHealthyMetrics refreshes handle_metrics.StatsHealthyGauges. We
// treat never-analyzed tables as healthy=0, count unanalyzed tables that fall below the
// auto-analyze minimal count threshold as "unneeded analyze", and keep pseudo tables as a separate category.
// The gauges satisfy: total tables = pseudo tables + unneeded analyze tables + tables in healthy buckets.
func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() {
var buckets [handle_metrics.StatsHealthyBucketCount]int64
for _, tbl := range s.Load().Values() {
buckets[handle_metrics.StatsHealthyBucketTotal]++
// Pseudo entries usually disappear after DDL processing or table updates load
// stats meta from storage, so usually you won't see many pseudo tables here.
if tbl.Pseudo {
buckets[handle_metrics.StatsHealthyBucketPseudo]++
continue
}
// Even if a table is ineligible for analysis, count it in the distribution once it has been analyzed before.
// Otherwise this metric may mislead users into thinking those tables are still unanalyzed.
if !tbl.MeetAutoAnalyzeMinCnt() && !tbl.IsAnalyzed() {
buckets[handle_metrics.StatsHealthyBucketUnneededAnalyze]++
continue
}
// NOTE: Tables that haven't been analyzed yet start from 0 healthy.
healthy, ok := tbl.GetStatsHealthy()
if !ok {
continue
}
buckets[statsHealthyBucketIndex(healthy)]++
}
for idx, gauge := range handle_metrics.StatsHealthyGauges {
gauge.Set(float64(buckets[idx]))
}
}
func statsHealthyBucketIndex(healthy int64) int {
intest.Assert(healthy >= 0 && healthy <= 100, "healthy value out of range: %d", healthy)
for _, cfg := range handle_metrics.HealthyBucketConfigs {
if cfg.UpperBound <= 0 {
continue
}
if healthy < cfg.UpperBound {
return cfg.Index
}
}
return handle_metrics.StatsHealthyBucket100To100
}