406 lines
13 KiB
Go
406 lines
13 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a Copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cache
|
|
|
|
import (
|
|
"context"
|
|
"slices"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
tidbmetrics "github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/statistics"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/cache/metrics"
|
|
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
|
|
handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/types"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/util"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// LeaseOffset represents the time offset for the stats cache to load statistics from the store.
|
|
// This value is crucial to ensure that the stats are retrieved at the correct interval.
|
|
// See more at where it is used.
|
|
const LeaseOffset = 5
|
|
|
|
// StatsCacheImpl implements util.StatsCache.
|
|
type StatsCacheImpl struct {
|
|
atomic.Pointer[StatsCache]
|
|
|
|
statsHandle types.StatsHandle
|
|
}
|
|
|
|
// NewStatsCacheImpl creates a new StatsCache.
|
|
func NewStatsCacheImpl(statsHandle types.StatsHandle) (types.StatsCache, error) {
|
|
newCache, err := NewStatsCache()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := &StatsCacheImpl{
|
|
statsHandle: statsHandle,
|
|
}
|
|
result.Store(newCache)
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// NewStatsCacheImplForTest creates a new StatsCache for test.
|
|
func NewStatsCacheImplForTest() (types.StatsCache, error) {
|
|
return NewStatsCacheImpl(nil)
|
|
}
|
|
|
|
// cacheOfBatchUpdate is a cache for batch update the stats cache.
|
|
// We should not insert a item based on a item which we get from the cache long time ago.
|
|
// It may cause the cache to be inconsistent.
|
|
// The item should be quickly modified and inserted back to the cache.
|
|
type cacheOfBatchUpdate struct {
|
|
op func(toUpdate []*statistics.Table, toDelete []int64)
|
|
toUpdate []*statistics.Table
|
|
toDelete []int64
|
|
batchSize int
|
|
}
|
|
|
|
const batchSizeOfUpdateBatch = 10
|
|
|
|
func (t *cacheOfBatchUpdate) internalFlush() {
|
|
t.op(t.toUpdate, t.toDelete)
|
|
t.toUpdate = t.toUpdate[:0]
|
|
t.toDelete = t.toDelete[:0]
|
|
}
|
|
|
|
func (t *cacheOfBatchUpdate) addToUpdate(table *statistics.Table) {
|
|
if len(t.toUpdate) == t.batchSize {
|
|
t.internalFlush()
|
|
}
|
|
t.toUpdate = append(t.toUpdate, table)
|
|
}
|
|
|
|
func (t *cacheOfBatchUpdate) addToDelete(tableID int64) {
|
|
if len(t.toDelete) == t.batchSize {
|
|
t.internalFlush()
|
|
}
|
|
t.toDelete = append(t.toDelete, tableID)
|
|
}
|
|
|
|
func (t *cacheOfBatchUpdate) flush() {
|
|
if len(t.toUpdate) > 0 || len(t.toDelete) > 0 {
|
|
t.internalFlush()
|
|
}
|
|
}
|
|
|
|
func newCacheOfBatchUpdate(batchSize int, op func(toUpdate []*statistics.Table, toDelete []int64)) cacheOfBatchUpdate {
|
|
return cacheOfBatchUpdate{
|
|
op: op,
|
|
toUpdate: make([]*statistics.Table, 0, batchSize),
|
|
toDelete: make([]int64, 0, batchSize),
|
|
batchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
// Update reads stats meta from store and updates the stats map.
|
|
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error {
|
|
onlyForAnalyzedTables := len(tableAndPartitionIDs) > 0
|
|
start := time.Now()
|
|
defer func() {
|
|
dur := time.Since(start)
|
|
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())
|
|
}()
|
|
lastVersion := s.GetNextCheckVersionWithOffset()
|
|
var (
|
|
skipMoveForwardStatsCache bool
|
|
rows []chunk.Row
|
|
err error
|
|
)
|
|
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
|
query := "SELECT version, table_id, modify_count, count, snapshot, last_stats_histograms_version from mysql.stats_meta where version > %? "
|
|
args := []any{lastVersion}
|
|
|
|
if onlyForAnalyzedTables {
|
|
// When updating specific tables, we skip incrementing the max stats version to avoid missing
|
|
// delta updates for other tables. The max version only advances when doing a full update.
|
|
skipMoveForwardStatsCache = true
|
|
// Sort and deduplicate the table IDs to remove duplicates
|
|
slices.Sort(tableAndPartitionIDs)
|
|
tableAndPartitionIDs = slices.Compact(tableAndPartitionIDs)
|
|
// Convert table IDs to strings since the SQL executor only accepts string arrays for IN clauses
|
|
tableStringIDs := make([]string, 0, len(tableAndPartitionIDs))
|
|
for _, tableID := range tableAndPartitionIDs {
|
|
tableStringIDs = append(tableStringIDs, strconv.FormatInt(tableID, 10))
|
|
}
|
|
query += "and table_id in (%?) "
|
|
args = append(args, tableStringIDs)
|
|
}
|
|
query += "order by version"
|
|
rows, _, err = util.ExecRows(sctx, query, args...)
|
|
return err
|
|
}); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
tblToUpdateOrDelete := newCacheOfBatchUpdate(batchSizeOfUpdateBatch, func(toUpdate []*statistics.Table, toDelete []int64) {
|
|
s.UpdateStatsCache(types.CacheUpdate{
|
|
Updated: toUpdate,
|
|
Deleted: toDelete,
|
|
Options: types.UpdateOptions{
|
|
SkipMoveForward: skipMoveForwardStatsCache,
|
|
},
|
|
})
|
|
})
|
|
|
|
for _, row := range rows {
|
|
version := row.GetUint64(0)
|
|
physicalID := row.GetInt64(1)
|
|
modifyCount := row.GetInt64(2)
|
|
count := row.GetInt64(3)
|
|
snapshot := row.GetUint64(4)
|
|
var latestHistUpdateVersion uint64
|
|
if !row.IsNull(5) {
|
|
latestHistUpdateVersion = row.GetUint64(5)
|
|
}
|
|
|
|
// Detect the context cancel signal, since it may take a long time for the loop.
|
|
// TODO: add context to TableInfoByID and remove this code block?
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
table, ok := s.statsHandle.TableInfoByID(is, physicalID)
|
|
if !ok {
|
|
statslogutil.StatsLogger().Debug(
|
|
"unknown physical ID in stats meta table, maybe it has been dropped",
|
|
zap.Int64("ID", physicalID),
|
|
)
|
|
tblToUpdateOrDelete.addToDelete(physicalID)
|
|
continue
|
|
}
|
|
tableInfo := table.Meta()
|
|
// If the table is not updated, we can skip it.
|
|
|
|
oldTbl, ok := s.Get(physicalID)
|
|
if ok && oldTbl.Version >= version &&
|
|
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
|
|
continue
|
|
}
|
|
var tbl *statistics.Table
|
|
needLoadColAndIdxStats := true
|
|
// If the column/index stats has not been updated, we can reuse the old table stats.
|
|
// Only need to update the count and modify count.
|
|
if ok && latestHistUpdateVersion > 0 && oldTbl.LastStatsHistVersion >= latestHistUpdateVersion {
|
|
tbl = oldTbl.CopyAs(statistics.MetaOnly)
|
|
// count and modify count is updated in finalProcess
|
|
needLoadColAndIdxStats = false
|
|
}
|
|
if needLoadColAndIdxStats {
|
|
tbl, err = s.statsHandle.TableStatsFromStorage(
|
|
tableInfo,
|
|
physicalID,
|
|
false,
|
|
0,
|
|
)
|
|
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
|
|
if err != nil {
|
|
statslogutil.StatsLogger().Warn(
|
|
"error occurred when read table stats",
|
|
zap.String("table", tableInfo.Name.O),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
if tbl == nil {
|
|
tblToUpdateOrDelete.addToDelete(physicalID)
|
|
continue
|
|
}
|
|
}
|
|
tbl.Version = version
|
|
tbl.LastStatsHistVersion = latestHistUpdateVersion
|
|
tbl.RealtimeCount = count
|
|
tbl.ModifyCount = modifyCount
|
|
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
|
|
// It only occurs in the following situations:
|
|
// 1. The table has already been analyzed,
|
|
// but because the predicate columns feature is turned on, and it doesn't have any columns or indexes analyzed,
|
|
// it only analyzes _row_id and refreshes stats_meta, in which case the snapshot is not zero.
|
|
// 2. LastAnalyzeVersion is 0 because it has never been loaded.
|
|
// In this case, we can initialize LastAnalyzeVersion to the snapshot,
|
|
// otherwise auto-analyze will assume that the table has never been analyzed and try to analyze it again.
|
|
if tbl.LastAnalyzeVersion == 0 && snapshot != 0 {
|
|
tbl.LastAnalyzeVersion = snapshot
|
|
}
|
|
tblToUpdateOrDelete.addToUpdate(tbl)
|
|
}
|
|
tblToUpdateOrDelete.flush()
|
|
return nil
|
|
}
|
|
|
|
// GetNextCheckVersionWithOffset gets the last version with offset.
|
|
func (s *StatsCacheImpl) GetNextCheckVersionWithOffset() uint64 {
|
|
// Get the greatest version of the stats meta table.
|
|
lastVersion := s.MaxTableStatsVersion()
|
|
// We need this because for two tables, the smaller version may write later than the one with larger version.
|
|
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
|
|
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
|
|
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
|
|
// We can read the stats if the diff between commit time and version is less than five lease.
|
|
offset := util.DurationToTS(LeaseOffset * s.statsHandle.Lease())
|
|
if lastVersion >= offset {
|
|
lastVersion = lastVersion - offset
|
|
} else {
|
|
lastVersion = 0
|
|
}
|
|
|
|
return lastVersion
|
|
}
|
|
|
|
// Replace replaces this cache.
|
|
func (s *StatsCacheImpl) Replace(cache types.StatsCache) {
|
|
x := cache.(*StatsCacheImpl)
|
|
s.replace(x.Load())
|
|
}
|
|
|
|
// replace replaces the cache with the new cache.
|
|
func (s *StatsCacheImpl) replace(newCache *StatsCache) {
|
|
old := s.Swap(newCache)
|
|
if old != nil {
|
|
old.Close()
|
|
}
|
|
metrics.CostGauge.Set(float64(newCache.Cost()))
|
|
}
|
|
|
|
// UpdateStatsCache updates the cache with the new cache.
|
|
func (s *StatsCacheImpl) UpdateStatsCache(cacheUpdate types.CacheUpdate) {
|
|
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
|
|
s.Load().Update(cacheUpdate.Updated, cacheUpdate.Deleted, cacheUpdate.Options.SkipMoveForward)
|
|
} else {
|
|
// TODO: remove this branch because we will always enable quota.
|
|
newCache := s.Load().CopyAndUpdate(cacheUpdate.Updated, cacheUpdate.Deleted)
|
|
s.replace(newCache)
|
|
}
|
|
}
|
|
|
|
// Close closes this cache.
|
|
func (s *StatsCacheImpl) Close() {
|
|
s.Load().Close()
|
|
}
|
|
|
|
// Clear clears this cache.
|
|
// Create a empty cache and replace the old one.
|
|
func (s *StatsCacheImpl) Clear() {
|
|
cache, err := NewStatsCache()
|
|
if err != nil {
|
|
statslogutil.StatsLogger().Warn("create stats cache failed", zap.Error(err))
|
|
return
|
|
}
|
|
s.replace(cache)
|
|
}
|
|
|
|
// MemConsumed returns its memory usage.
|
|
func (s *StatsCacheImpl) MemConsumed() (size int64) {
|
|
return s.Load().Cost()
|
|
}
|
|
|
|
// Get returns the specified table's stats.
|
|
func (s *StatsCacheImpl) Get(tableID int64) (*statistics.Table, bool) {
|
|
failpoint.Inject("StatsCacheGetNil", func() {
|
|
failpoint.Return(nil, false)
|
|
})
|
|
return s.Load().Get(tableID)
|
|
}
|
|
|
|
// Put puts this table stats into the cache.
|
|
func (s *StatsCacheImpl) Put(id int64, t *statistics.Table) {
|
|
s.Load().put(id, t)
|
|
}
|
|
|
|
// TriggerEvict triggers the cache to evict some items.
|
|
func (s *StatsCacheImpl) TriggerEvict() {
|
|
s.Load().TriggerEvict()
|
|
}
|
|
|
|
// MaxTableStatsVersion returns the version of the current cache, which is defined as
|
|
// the max table stats version the cache has in its lifecycle.
|
|
func (s *StatsCacheImpl) MaxTableStatsVersion() uint64 {
|
|
return s.Load().Version()
|
|
}
|
|
|
|
// Values returns all values in this cache.
|
|
func (s *StatsCacheImpl) Values() []*statistics.Table {
|
|
return s.Load().Values()
|
|
}
|
|
|
|
// Len returns the length of this cache.
|
|
func (s *StatsCacheImpl) Len() int {
|
|
return s.Load().Len()
|
|
}
|
|
|
|
// SetStatsCacheCapacity sets the cache's capacity.
|
|
func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) {
|
|
s.Load().SetCapacity(c)
|
|
}
|
|
|
|
// UpdateStatsHealthyMetrics refreshes handle_metrics.StatsHealthyGauges. We
|
|
// treat never-analyzed tables as healthy=0, count unanalyzed tables that fall below the
|
|
// auto-analyze minimal count threshold as "unneeded analyze", and keep pseudo tables as a separate category.
|
|
// The gauges satisfy: total tables = pseudo tables + unneeded analyze tables + tables in healthy buckets.
|
|
func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() {
|
|
var buckets [handle_metrics.StatsHealthyBucketCount]int64
|
|
for _, tbl := range s.Load().Values() {
|
|
buckets[handle_metrics.StatsHealthyBucketTotal]++
|
|
|
|
// Pseudo entries usually disappear after DDL processing or table updates load
|
|
// stats meta from storage, so usually you won't see many pseudo tables here.
|
|
if tbl.Pseudo {
|
|
buckets[handle_metrics.StatsHealthyBucketPseudo]++
|
|
continue
|
|
}
|
|
// Even if a table is ineligible for analysis, count it in the distribution once it has been analyzed before.
|
|
// Otherwise this metric may mislead users into thinking those tables are still unanalyzed.
|
|
if !tbl.MeetAutoAnalyzeMinCnt() && !tbl.IsAnalyzed() {
|
|
buckets[handle_metrics.StatsHealthyBucketUnneededAnalyze]++
|
|
continue
|
|
}
|
|
// NOTE: Tables that haven't been analyzed yet start from 0 healthy.
|
|
healthy, ok := tbl.GetStatsHealthy()
|
|
if !ok {
|
|
continue
|
|
}
|
|
buckets[statsHealthyBucketIndex(healthy)]++
|
|
}
|
|
for idx, gauge := range handle_metrics.StatsHealthyGauges {
|
|
gauge.Set(float64(buckets[idx]))
|
|
}
|
|
}
|
|
|
|
func statsHealthyBucketIndex(healthy int64) int {
|
|
intest.Assert(healthy >= 0 && healthy <= 100, "healthy value out of range: %d", healthy)
|
|
for _, cfg := range handle_metrics.HealthyBucketConfigs {
|
|
if cfg.UpperBound <= 0 {
|
|
continue
|
|
}
|
|
if healthy < cfg.UpperBound {
|
|
return cfg.Index
|
|
}
|
|
}
|
|
return handle_metrics.StatsHealthyBucket100To100
|
|
}
|