statistics: split async load package (#53318)
This commit is contained in:
@ -135,6 +135,7 @@ go_library(
|
||||
"//pkg/sessiontxn",
|
||||
"//pkg/sessiontxn/staleread",
|
||||
"//pkg/statistics",
|
||||
"//pkg/statistics/asyncload",
|
||||
"//pkg/table",
|
||||
"//pkg/table/tables",
|
||||
"//pkg/table/temptable",
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/pkg/statistics"
|
||||
"github.com/pingcap/tidb/pkg/statistics/asyncload"
|
||||
"github.com/pingcap/tidb/pkg/util/intset"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
@ -436,7 +436,7 @@ func CollectColumnStatsUsage(lp base.LogicalPlan, predicate, histNeeded bool) (
|
||||
if histNeeded {
|
||||
collector.histNeededCols[*colToTriggerLoad] = true
|
||||
} else {
|
||||
statistics.HistogramNeededItems.Insert(*colToTriggerLoad, true)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Insert(*colToTriggerLoad, true)
|
||||
}
|
||||
})
|
||||
var (
|
||||
|
||||
@ -38,6 +38,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/sessionctx"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/pkg/statistics"
|
||||
"github.com/pingcap/tidb/pkg/statistics/asyncload"
|
||||
"github.com/pingcap/tidb/pkg/table"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
|
||||
@ -282,7 +283,7 @@ func (ds *DataSource) initStats(colGroups [][]*expression.Column) {
|
||||
// If we enable lite stats init or we just found out the meta info of the column is missed, we need to register columns for async load.
|
||||
_, isLoadNeeded, _ := ds.statisticTable.ColumnIsLoadNeeded(col.ID, false)
|
||||
if isLoadNeeded {
|
||||
statistics.HistogramNeededItems.Insert(model.TableItemID{
|
||||
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
|
||||
TableID: ds.tableInfo.ID,
|
||||
ID: col.ID,
|
||||
IsIndex: false,
|
||||
|
||||
@ -35,6 +35,7 @@ go_library(
|
||||
"//pkg/sessionctx",
|
||||
"//pkg/sessionctx/stmtctx",
|
||||
"//pkg/sessionctx/variable",
|
||||
"//pkg/statistics/asyncload",
|
||||
"//pkg/statistics/handle/logutil",
|
||||
"//pkg/tablecodec",
|
||||
"//pkg/types",
|
||||
|
||||
9
pkg/statistics/asyncload/BUILD.bazel
Normal file
9
pkg/statistics/asyncload/BUILD.bazel
Normal file
@ -0,0 +1,9 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "asyncload",
|
||||
srcs = ["async_load.go"],
|
||||
importpath = "github.com/pingcap/tidb/pkg/statistics/asyncload",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//pkg/parser/model"],
|
||||
)
|
||||
120
pkg/statistics/asyncload/async_load.go
Normal file
120
pkg/statistics/asyncload/async_load.go
Normal file
@ -0,0 +1,120 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package asyncload
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
)
|
||||
|
||||
// AsyncLoadHistogramNeededItems stores the columns/indices whose Histograms need to be loaded from physical kv layer.
|
||||
// Currently, we only load index/pk's Histogram from kv automatically. Columns' are loaded by needs.
|
||||
var AsyncLoadHistogramNeededItems = newNeededStatsMap()
|
||||
|
||||
type neededStatsInternalMap struct {
|
||||
// the bool value indicates whether is a full load or not.
|
||||
items map[model.TableItemID]bool
|
||||
m sync.RWMutex
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) AllItems() []model.StatsLoadItem {
|
||||
n.m.RLock()
|
||||
keys := make([]model.StatsLoadItem, 0, len(n.items))
|
||||
for key, val := range n.items {
|
||||
keys = append(keys, model.StatsLoadItem{
|
||||
TableItemID: key,
|
||||
FullLoad: val,
|
||||
})
|
||||
}
|
||||
n.m.RUnlock()
|
||||
return keys
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Insert(col model.TableItemID, fullLoad bool) {
|
||||
n.m.Lock()
|
||||
cur := n.items[col]
|
||||
if cur {
|
||||
// If the existing one is full load. We don't need to update it.
|
||||
n.m.Unlock()
|
||||
return
|
||||
}
|
||||
n.items[col] = fullLoad
|
||||
// Otherwise, we could safely update it.
|
||||
n.m.Unlock()
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Delete(col model.TableItemID) {
|
||||
n.m.Lock()
|
||||
delete(n.items, col)
|
||||
n.m.Unlock()
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Length() int {
|
||||
n.m.RLock()
|
||||
defer n.m.RUnlock()
|
||||
return len(n.items)
|
||||
}
|
||||
|
||||
const shardCnt = 128
|
||||
|
||||
type neededStatsMap struct {
|
||||
items [shardCnt]neededStatsInternalMap
|
||||
}
|
||||
|
||||
func getIdx(tbl model.TableItemID) int64 {
|
||||
var id int64
|
||||
if tbl.ID < 0 {
|
||||
id = -tbl.ID
|
||||
} else {
|
||||
id = tbl.ID
|
||||
}
|
||||
return id % shardCnt
|
||||
}
|
||||
|
||||
func newNeededStatsMap() *neededStatsMap {
|
||||
result := neededStatsMap{}
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
result.items[i] = neededStatsInternalMap{
|
||||
items: make(map[model.TableItemID]bool),
|
||||
}
|
||||
}
|
||||
return &result
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) AllItems() []model.StatsLoadItem {
|
||||
var result []model.StatsLoadItem
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
keys := n.items[i].AllItems()
|
||||
result = append(result, keys...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Insert(col model.TableItemID, fullLoad bool) {
|
||||
n.items[getIdx(col)].Insert(col, fullLoad)
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Delete(col model.TableItemID) {
|
||||
n.items[getIdx(col)].Delete(col)
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Length() int {
|
||||
var result int
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
result += n.items[i].Length()
|
||||
}
|
||||
return result
|
||||
}
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/planner/context"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/debugtrace"
|
||||
"github.com/pingcap/tidb/pkg/statistics/asyncload"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/chunk"
|
||||
)
|
||||
@ -132,10 +133,6 @@ func (c *Column) MemoryUsage() CacheItemMemoryUsage {
|
||||
return columnMemUsage
|
||||
}
|
||||
|
||||
// HistogramNeededItems stores the columns/indices whose Histograms need to be loaded from physical kv layer.
|
||||
// Currently, we only load index/pk's Histogram from kv automatically. Columns' are loaded by needs.
|
||||
var HistogramNeededItems = newNeededStatsMap()
|
||||
|
||||
// ColumnStatsIsInvalid checks if this column is invalid.
|
||||
// If this column has histogram but not loaded yet,
|
||||
// then we mark it as need histogram.
|
||||
@ -161,7 +158,7 @@ func ColumnStatsIsInvalid(colStats *Column, sctx context.PlanContext, histColl *
|
||||
if (colStats == nil || !colStats.IsStatsInitialized() || colStats.IsLoadNeeded()) &&
|
||||
stmtctx != nil &&
|
||||
!histColl.CanNotTriggerLoad {
|
||||
HistogramNeededItems.Insert(model.TableItemID{
|
||||
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
|
||||
TableID: histColl.PhysicalID,
|
||||
ID: cid,
|
||||
IsIndex: false,
|
||||
|
||||
@ -22,6 +22,7 @@ go_library(
|
||||
"//pkg/sessionctx",
|
||||
"//pkg/sessionctx/variable",
|
||||
"//pkg/statistics",
|
||||
"//pkg/statistics/asyncload",
|
||||
"//pkg/statistics/handle/cache",
|
||||
"//pkg/statistics/handle/lockstats",
|
||||
"//pkg/statistics/handle/logutil",
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx"
|
||||
"github.com/pingcap/tidb/pkg/statistics"
|
||||
"github.com/pingcap/tidb/pkg/statistics/asyncload"
|
||||
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
|
||||
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/util"
|
||||
@ -539,7 +540,7 @@ func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID i
|
||||
|
||||
// LoadNeededHistograms will load histograms for those needed columns/indices.
|
||||
func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCache, loadFMSketch bool) (err error) {
|
||||
items := statistics.HistogramNeededItems.AllItems()
|
||||
items := asyncload.AsyncLoadHistogramNeededItems.AllItems()
|
||||
for _, item := range items {
|
||||
if !item.IsIndex {
|
||||
err = loadNeededColumnHistograms(sctx, statsCache, item.TableItemID, loadFMSketch, item.FullLoad)
|
||||
@ -556,12 +557,12 @@ func LoadNeededHistograms(sctx sessionctx.Context, statsCache statstypes.StatsCa
|
||||
|
||||
// CleanFakeItemsForShowHistInFlights cleans the invalid inserted items.
|
||||
func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
|
||||
items := statistics.HistogramNeededItems.AllItems()
|
||||
items := asyncload.AsyncLoadHistogramNeededItems.AllItems()
|
||||
reallyNeeded := 0
|
||||
for _, item := range items {
|
||||
tbl, ok := statsCache.Get(item.TableID)
|
||||
if !ok {
|
||||
statistics.HistogramNeededItems.Delete(item.TableItemID)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID)
|
||||
continue
|
||||
}
|
||||
loadNeeded := false
|
||||
@ -573,7 +574,7 @@ func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int {
|
||||
loadNeeded = loadNeeded && analyzed
|
||||
}
|
||||
if !loadNeeded {
|
||||
statistics.HistogramNeededItems.Delete(item.TableItemID)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID)
|
||||
continue
|
||||
}
|
||||
reallyNeeded++
|
||||
@ -589,13 +590,13 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
|
||||
var colInfo *model.ColumnInfo
|
||||
_, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(col.ID, true)
|
||||
if !loadNeeded || !analyzed {
|
||||
statistics.HistogramNeededItems.Delete(col)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
|
||||
return nil
|
||||
}
|
||||
colInfo = tbl.ColAndIdxExistenceMap.GetCol(col.ID)
|
||||
hg, _, statsVer, _, err := HistMetaFromStorage(sctx, &col, colInfo)
|
||||
if hg == nil || err != nil {
|
||||
statistics.HistogramNeededItems.Delete(col)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
|
||||
return err
|
||||
}
|
||||
var (
|
||||
@ -649,7 +650,7 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
|
||||
}
|
||||
tbl.Columns[col.ID] = colHist
|
||||
statsCache.UpdateStatsCache([]*statistics.Table{tbl}, nil)
|
||||
statistics.HistogramNeededItems.Delete(col)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
|
||||
if col.IsSyncLoadFailed {
|
||||
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
|
||||
zap.Int64("table_id", colHist.PhysicalID),
|
||||
@ -666,12 +667,12 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
|
||||
}
|
||||
_, loadNeeded := tbl.IndexIsLoadNeeded(idx.ID)
|
||||
if !loadNeeded {
|
||||
statistics.HistogramNeededItems.Delete(idx)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
|
||||
return nil
|
||||
}
|
||||
hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorage(sctx, &idx, nil)
|
||||
if hgMeta == nil || err != nil {
|
||||
statistics.HistogramNeededItems.Delete(idx)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
|
||||
return err
|
||||
}
|
||||
idxInfo := tbl.ColAndIdxExistenceMap.GetIndex(idx.ID)
|
||||
@ -713,7 +714,7 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
|
||||
zap.Int64("index_id", idxHist.Info.ID),
|
||||
zap.String("index_name", idxHist.Info.Name.O))
|
||||
}
|
||||
statistics.HistogramNeededItems.Delete(idx)
|
||||
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -88,7 +88,7 @@ func TestLoadStats(t *testing.T) {
|
||||
idx, ok = stat.Indices[idxBID]
|
||||
require.True(t, !ok || (float64(idx.CMSketch.TotalCount())+float64(idx.TopN.TotalCount())+idx.Histogram.TotalRowCount() == 0))
|
||||
require.False(t, ok && idx.IsEssentialStatsLoaded())
|
||||
// IsInvalid adds the index to HistogramNeededItems.
|
||||
// IsInvalid adds the index to AsyncLoadHistogramNeededItems.
|
||||
statistics.IndexStatsIsInvalid(testKit.Session().GetPlanCtx(), idx, &stat.HistColl, idxBID)
|
||||
require.NoError(t, h.LoadNeededHistograms())
|
||||
stat = h.GetTableStats(tableInfo)
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/planner/context"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/debugtrace"
|
||||
"github.com/pingcap/tidb/pkg/statistics/asyncload"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/chunk"
|
||||
"github.com/twmb/murmur3"
|
||||
@ -144,7 +145,7 @@ func IndexStatsIsInvalid(sctx context.PlanContext, idxStats *Index, coll *HistCo
|
||||
// If the given index statistics is nil or we found that the index's statistics hasn't been fully loaded, we add this index to NeededItems.
|
||||
// Also, we need to check that this HistColl has its physical ID and it is permitted to trigger the stats loading.
|
||||
if (idxStats == nil || !idxStats.IsFullLoad()) && !coll.CanNotTriggerLoad {
|
||||
HistogramNeededItems.Insert(model.TableItemID{
|
||||
asyncload.AsyncLoadHistogramNeededItems.Insert(model.TableItemID{
|
||||
TableID: coll.PhysicalID,
|
||||
ID: cid,
|
||||
IsIndex: true,
|
||||
|
||||
@ -19,7 +19,6 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
@ -676,101 +675,6 @@ func (t *Table) IndexIsLoadNeeded(id int64) (*Index, bool) {
|
||||
return idx, false
|
||||
}
|
||||
|
||||
type neededStatsInternalMap struct {
|
||||
// the bool value indicates whether is a full load or not.
|
||||
items map[model.TableItemID]bool
|
||||
m sync.RWMutex
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) AllItems() []model.StatsLoadItem {
|
||||
n.m.RLock()
|
||||
keys := make([]model.StatsLoadItem, 0, len(n.items))
|
||||
for key, val := range n.items {
|
||||
keys = append(keys, model.StatsLoadItem{
|
||||
TableItemID: key,
|
||||
FullLoad: val,
|
||||
})
|
||||
}
|
||||
n.m.RUnlock()
|
||||
return keys
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Insert(col model.TableItemID, fullLoad bool) {
|
||||
n.m.Lock()
|
||||
cur := n.items[col]
|
||||
if cur {
|
||||
// If the existing one is full load. We don't need to update it.
|
||||
n.m.Unlock()
|
||||
return
|
||||
}
|
||||
n.items[col] = fullLoad
|
||||
// Otherwise, we could safely update it.
|
||||
n.m.Unlock()
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Delete(col model.TableItemID) {
|
||||
n.m.Lock()
|
||||
delete(n.items, col)
|
||||
n.m.Unlock()
|
||||
}
|
||||
|
||||
func (n *neededStatsInternalMap) Length() int {
|
||||
n.m.RLock()
|
||||
defer n.m.RUnlock()
|
||||
return len(n.items)
|
||||
}
|
||||
|
||||
const shardCnt = 128
|
||||
|
||||
type neededStatsMap struct {
|
||||
items [shardCnt]neededStatsInternalMap
|
||||
}
|
||||
|
||||
func getIdx(tbl model.TableItemID) int64 {
|
||||
var id int64
|
||||
if tbl.ID < 0 {
|
||||
id = -tbl.ID
|
||||
} else {
|
||||
id = tbl.ID
|
||||
}
|
||||
return id % shardCnt
|
||||
}
|
||||
|
||||
func newNeededStatsMap() *neededStatsMap {
|
||||
result := neededStatsMap{}
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
result.items[i] = neededStatsInternalMap{
|
||||
items: make(map[model.TableItemID]bool),
|
||||
}
|
||||
}
|
||||
return &result
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) AllItems() []model.StatsLoadItem {
|
||||
var result []model.StatsLoadItem
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
keys := n.items[i].AllItems()
|
||||
result = append(result, keys...)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Insert(col model.TableItemID, fullLoad bool) {
|
||||
n.items[getIdx(col)].Insert(col, fullLoad)
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Delete(col model.TableItemID) {
|
||||
n.items[getIdx(col)].Delete(col)
|
||||
}
|
||||
|
||||
func (n *neededStatsMap) Length() int {
|
||||
var result int
|
||||
for i := 0; i < shardCnt; i++ {
|
||||
result += n.items[i].Length()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// RatioOfPseudoEstimate means if modifyCount / statsTblCount is greater than this ratio, we think the stats is invalid
|
||||
// and use pseudo estimation.
|
||||
var RatioOfPseudoEstimate = atomic.NewFloat64(0.7)
|
||||
|
||||
Reference in New Issue
Block a user