statistics: add an analysis job factory (#56073)

ref pingcap/tidb#55906
This commit is contained in:
Rustin
2024-09-14 05:11:25 -07:00
committed by GitHub
parent 4c2ca6f341
commit 5709ba4357
6 changed files with 906 additions and 865 deletions

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "priorityqueue",
srcs = [
"analysis_job_factory.go",
"calculator.go",
"dynamic_partitioned_table_analysis_job.go",
"interval.go",
@ -14,13 +15,18 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue",
visibility = ["//visibility:public"],
deps = [
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
@ -29,6 +35,7 @@ go_test(
name = "priorityqueue_test",
timeout = "short",
srcs = [
"analysis_job_factory_test.go",
"calculator_test.go",
"dynamic_partitioned_table_analysis_job_test.go",
"interval_test.go",
@ -39,15 +46,18 @@ go_test(
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 22,
shard_count = 28,
deps = [
":priorityqueue",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/statistics",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
],
)

View File

@ -0,0 +1,382 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package priorityqueue
import (
"time"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/tikv/client-go/v2/oracle"
)
const (
// unanalyzedTableDefaultChangePercentage is the default change percentage of unanalyzed table.
unanalyzedTableDefaultChangePercentage = 1
// unanalyzedTableDefaultLastUpdateDuration is the default last update duration of unanalyzed table.
unanalyzedTableDefaultLastUpdateDuration = -30 * time.Minute
)
// AnalysisJobFactory is responsible for creating different types of analysis jobs.
// NOTE: This struct is not thread-safe.
type AnalysisJobFactory struct {
sctx sessionctx.Context
autoAnalyzeRatio float64
// The current TSO.
currentTs uint64
}
// NewAnalysisJobFactory creates a new AnalysisJobFactory.
func NewAnalysisJobFactory(sctx sessionctx.Context, autoAnalyzeRatio float64, currentTs uint64) *AnalysisJobFactory {
return &AnalysisJobFactory{
sctx: sctx,
autoAnalyzeRatio: autoAnalyzeRatio,
currentTs: currentTs,
}
}
// CreateNonPartitionedTableAnalysisJob creates a job for non-partitioned tables.
func (f *AnalysisJobFactory) CreateNonPartitionedTableAnalysisJob(
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
) AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
return nil
}
tableStatsVer := f.sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)
changePercentage := f.CalculateChangePercentage(tblStats)
tableSize := f.CalculateTableSize(tblStats)
lastAnalysisDuration := f.GetTableLastAnalyzeDuration(tblStats)
indexes := f.CheckIndexesNeedAnalyze(tblInfo, tblStats)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}
return NewNonPartitionedTableAnalysisJob(
tableSchema,
tblInfo.Name.O,
tblInfo.ID,
indexes,
tableStatsVer,
changePercentage,
tableSize,
lastAnalysisDuration,
)
}
// CreateStaticPartitionAnalysisJob creates a job for static partitions.
func (f *AnalysisJobFactory) CreateStaticPartitionAnalysisJob(
tableSchema string,
globalTblInfo *model.TableInfo,
partitionID int64,
partitionName string,
partitionStats *statistics.Table,
) AnalysisJob {
if !partitionStats.IsEligibleForAnalysis() {
return nil
}
tableStatsVer := f.sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(partitionStats, &tableStatsVer)
changePercentage := f.CalculateChangePercentage(partitionStats)
tableSize := f.CalculateTableSize(partitionStats)
lastAnalysisDuration := f.GetTableLastAnalyzeDuration(partitionStats)
indexes := f.CheckIndexesNeedAnalyze(globalTblInfo, partitionStats)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}
return NewStaticPartitionTableAnalysisJob(
tableSchema,
globalTblInfo.Name.O,
globalTblInfo.ID,
partitionName,
partitionID,
indexes,
tableStatsVer,
changePercentage,
tableSize,
lastAnalysisDuration,
)
}
// CreateDynamicPartitionedTableAnalysisJob creates a job for dynamic partitioned tables.
func (f *AnalysisJobFactory) CreateDynamicPartitionedTableAnalysisJob(
tableSchema string,
globalTblInfo *model.TableInfo,
globalTblStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
) AnalysisJob {
if !globalTblStats.IsEligibleForAnalysis() {
return nil
}
// TODO: figure out how to check the table stats version correctly for partitioned tables.
tableStatsVer := f.sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(globalTblStats, &tableStatsVer)
avgChange, avgSize, minLastAnalyzeDuration, partitionNames := f.CalculateIndicatorsForPartitions(globalTblStats, partitionStats)
partitionIndexes := f.CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(globalTblInfo, partitionStats)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if len(partitionNames) == 0 && len(partitionIndexes) == 0 {
return nil
}
return NewDynamicPartitionedTableAnalysisJob(
tableSchema,
globalTblInfo.Name.O,
globalTblInfo.ID,
partitionNames,
partitionIndexes,
tableStatsVer,
avgChange,
avgSize,
minLastAnalyzeDuration,
)
}
// CalculateChangePercentage calculates the change percentage of the table
// based on the change count and the analysis count.
func (f *AnalysisJobFactory) CalculateChangePercentage(tblStats *statistics.Table) float64 {
if !tblStats.IsAnalyzed() {
return unanalyzedTableDefaultChangePercentage
}
// Auto analyze based on the change percentage is disabled.
// However, this check should not affect the analysis of indexes,
// as index analysis is still needed for query performance.
if f.autoAnalyzeRatio == 0 {
return 0
}
tblCnt := float64(tblStats.RealtimeCount)
if histCnt := tblStats.GetAnalyzeRowCount(); histCnt > 0 {
tblCnt = histCnt
}
res := float64(tblStats.ModifyCount) / tblCnt
if res > f.autoAnalyzeRatio {
return res
}
return 0
}
// CalculateTableSize calculates the size of the table.
func (*AnalysisJobFactory) CalculateTableSize(tblStats *statistics.Table) float64 {
tblCnt := float64(tblStats.RealtimeCount)
colCnt := float64(tblStats.ColAndIdxExistenceMap.ColNum())
intest.Assert(colCnt != 0, "Column count should not be 0")
return tblCnt * colCnt
}
// GetTableLastAnalyzeDuration gets the last analyze duration of the table.
func (f *AnalysisJobFactory) GetTableLastAnalyzeDuration(tblStats *statistics.Table) time.Duration {
lastTime := f.FindLastAnalyzeTime(tblStats)
currentTime := oracle.GetTimeFromTS(f.currentTs)
// Calculate the duration since last analyze.
return currentTime.Sub(lastTime)
}
// FindLastAnalyzeTime finds the last analyze time of the table.
// It uses `LastUpdateVersion` to find the last analyze time.
// The `LastUpdateVersion` is the version of the transaction that updates the statistics.
// It always not null(default 0), so we can use it to find the last analyze time.
func (f *AnalysisJobFactory) FindLastAnalyzeTime(tblStats *statistics.Table) time.Time {
if !tblStats.IsAnalyzed() {
phy := oracle.GetTimeFromTS(f.currentTs)
return phy.Add(unanalyzedTableDefaultLastUpdateDuration)
}
return oracle.GetTimeFromTS(tblStats.LastAnalyzeVersion)
}
// CheckIndexesNeedAnalyze checks if the indexes need to be analyzed.
func (*AnalysisJobFactory) CheckIndexesNeedAnalyze(tblInfo *model.TableInfo, tblStats *statistics.Table) []string {
// If table is not analyzed, we need to analyze whole table.
// So we don't need to check indexes.
if !tblStats.IsAnalyzed() {
return nil
}
indexes := make([]string, 0, len(tblInfo.Indices))
// Check if missing index stats.
for _, idx := range tblInfo.Indices {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) && idx.State == model.StatePublic {
indexes = append(indexes, idx.Name.O)
}
}
return indexes
}
// CalculateIndicatorsForPartitions calculates the average change percentage,
// average size and average last analyze duration for the partitions that meet the threshold.
// Change percentage is the ratio of the number of modified rows to the total number of rows.
// Size is the product of the number of rows and the number of columns.
// Last analyze duration is the duration since the last analyze.
func (f *AnalysisJobFactory) CalculateIndicatorsForPartitions(
globalStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
) (
avgChange float64,
avgSize float64,
avgLastAnalyzeDuration time.Duration,
partitionNames []string,
) {
totalChangePercent := 0.0
totalSize := 0.0
count := 0.0
partitionNames = make([]string, 0, len(partitionStats))
cols := float64(globalStats.ColAndIdxExistenceMap.ColNum())
intest.Assert(cols != 0, "Column count should not be 0")
totalLastAnalyzeDuration := time.Duration(0)
for pIDAndName, tblStats := range partitionStats {
// Skip partition analysis if it doesn't meet the threshold, stats are not yet loaded,
// or the auto analyze ratio is set to 0 by the user.
changePercent := f.CalculateChangePercentage(tblStats)
if changePercent == 0 {
continue
}
totalChangePercent += changePercent
// size = count * cols
totalSize += float64(tblStats.RealtimeCount) * cols
lastAnalyzeDuration := f.GetTableLastAnalyzeDuration(tblStats)
totalLastAnalyzeDuration += lastAnalyzeDuration
partitionNames = append(partitionNames, pIDAndName.Name)
count++
}
if len(partitionNames) == 0 {
return 0, 0, 0, partitionNames
}
avgChange = totalChangePercent / count
avgSize = totalSize / count
avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count)
return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames
}
// CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed.
// It returns a map from index name to the names of the partitions that need to be analyzed.
// NOTE: This is only for newly added indexes.
func (*AnalysisJobFactory) CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo *model.TableInfo,
partitionStats map[PartitionIDAndName]*statistics.Table,
) map[string][]string {
partitionIndexes := make(map[string][]string, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
// No need to analyze the index if it's not public.
if idx.State != model.StatePublic {
continue
}
// Find all the partitions that need to analyze this index.
names := make([]string, 0, len(partitionStats))
for pIDAndName, tblStats := range partitionStats {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) {
names = append(names, pIDAndName.Name)
}
}
if len(names) > 0 {
partitionIndexes[idx.Name.O] = names
}
}
return partitionIndexes
}
// PartitionIDAndName is a struct that contains the ID and name of a partition.
// Exported for testing purposes. Do not use it in other packages.
type PartitionIDAndName struct {
Name string
ID int64
}
// NewPartitionIDAndName creates a new PartitionIDAndName.
func NewPartitionIDAndName(name string, id int64) PartitionIDAndName {
return PartitionIDAndName{
Name: name,
ID: id,
}
}
// GetPartitionStats gets the partition stats.
func GetPartitionStats(
statsHandle statstypes.StatsHandle,
tblInfo *model.TableInfo,
defs []model.PartitionDefinition,
) map[PartitionIDAndName]*statistics.Table {
partitionStats := make(map[PartitionIDAndName]*statistics.Table, len(defs))
for _, def := range defs {
stats := statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, def.ID)
// Ignore the partition if it's not ready to analyze.
if !stats.IsEligibleForAnalysis() {
continue
}
d := NewPartitionIDAndName(def.Name.O, def.ID)
partitionStats[d] = stats
}
return partitionStats
}
// AutoAnalysisTimeWindow is a struct that contains the start and end time of the auto analyze time window.
type AutoAnalysisTimeWindow struct {
start time.Time
end time.Time
}
// NewAutoAnalysisTimeWindow creates a new AutoAnalysisTimeWindow.
func NewAutoAnalysisTimeWindow(start, end time.Time) AutoAnalysisTimeWindow {
return AutoAnalysisTimeWindow{
start: start,
end: end,
}
}
// IsWithinTimeWindow checks if the current time is within the time window.
// If the auto analyze time window is not set or the current time is not in the window, return false.
func (a AutoAnalysisTimeWindow) IsWithinTimeWindow(currentTime time.Time) bool {
if a.start.IsZero() || a.end.IsZero() {
return false
}
return timeutil.WithinDayTimePeriod(a.start, a.end, currentTime)
}

View File

@ -0,0 +1,479 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package priorityqueue_test
import (
"sort"
"testing"
"time"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
func TestCalculateChangePercentage(t *testing.T) {
tests := []struct {
name string
tblStats *statistics.Table
autoAnalyzeRatio float64
want float64
}{
{
name: "Unanalyzed table",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, nil, nil),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 0),
},
autoAnalyzeRatio: 0.5,
want: 1,
},
{
name: "Analyzed table with change percentage above threshold",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 100, 60, nil, nil),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(1, 1),
LastAnalyzeVersion: 1,
},
autoAnalyzeRatio: 0.5,
want: 0.6,
},
{
name: "Analyzed table with change percentage below threshold",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 100, 40, nil, nil),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(1, 1),
LastAnalyzeVersion: 1,
},
autoAnalyzeRatio: 0.5,
want: 0,
},
{
name: "Auto analyze ratio set to 0",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 100, 60, nil, nil),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(1, 1),
LastAnalyzeVersion: 1,
},
autoAnalyzeRatio: 0,
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := priorityqueue.NewAnalysisJobFactory(nil, tt.autoAnalyzeRatio, 0)
got := factory.CalculateChangePercentage(tt.tblStats)
require.InDelta(t, tt.want, got, 0.001)
})
}
}
func TestGetTableLastAnalyzeDuration(t *testing.T) {
tests := []struct {
name string
tblStats *statistics.Table
currentTs uint64
wantDuration time.Duration
}{
{
name: "Analyzed table",
tblStats: &statistics.Table{
LastAnalyzeVersion: oracle.GoTimeToTS(time.Now().Add(-24 * time.Hour)),
},
currentTs: oracle.GoTimeToTS(time.Now()),
wantDuration: 24 * time.Hour,
},
{
name: "Unanalyzed table",
tblStats: &statistics.Table{
HistColl: statistics.HistColl{},
},
currentTs: oracle.GoTimeToTS(time.Now()),
wantDuration: 30 * time.Minute,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := priorityqueue.NewAnalysisJobFactory(nil, 0, tt.currentTs)
got := factory.GetTableLastAnalyzeDuration(tt.tblStats)
require.InDelta(t, tt.wantDuration, got, float64(time.Second))
})
}
}
func TestCheckIndexesNeedAnalyze(t *testing.T) {
analyzedMap := statistics.NewColAndIndexExistenceMap(1, 0)
analyzedMap.InsertCol(1, nil, true)
analyzedMap.InsertIndex(1, nil, false)
tests := []struct {
name string
tblInfo *model.TableInfo
tblStats *statistics.Table
want []string
}{
{
name: "Test Table not analyzed",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
},
},
tblStats: &statistics.Table{ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 0)},
want: nil,
},
{
name: "Test Index not analyzed",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
},
},
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 0, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
}, nil),
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: 1,
},
want: []string{"index1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := priorityqueue.NewAnalysisJobFactory(nil, 0, 0)
got := factory.CheckIndexesNeedAnalyze(tt.tblInfo, tt.tblStats)
require.Equal(t, tt.want, got)
})
}
}
func TestCalculateIndicatorsForPartitions(t *testing.T) {
// 2024-01-01 10:00:00
currentTime := time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC)
currentTs := oracle.GoTimeToTS(currentTime)
// 2023-12-31 10:00:00
lastUpdateTime := time.Date(2023, 12, 31, 10, 0, 0, 0, time.UTC)
lastUpdateTs := oracle.GoTimeToTS(lastUpdateTime)
unanalyzedMap := statistics.NewColAndIndexExistenceMap(0, 0)
analyzedMap := statistics.NewColAndIndexExistenceMap(2, 1)
analyzedMap.InsertCol(1, nil, true)
analyzedMap.InsertCol(2, nil, true)
analyzedMap.InsertIndex(1, nil, true)
tests := []struct {
name string
globalStats *statistics.Table
partitionStats map[priorityqueue.PartitionIDAndName]*statistics.Table
defs []model.PartitionDefinition
autoAnalyzeRatio float64
currentTs uint64
wantAvgChangePercentage float64
wantAvgSize float64
wantAvgLastAnalyzeDuration time.Duration
wantPartitions []string
}{
{
name: "Test Table not analyzed",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[priorityqueue.PartitionIDAndName]*statistics.Table{
priorityqueue.NewPartitionIDAndName("p0", 1): {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: statistics.AutoAnalyzeMinCnt + 1,
},
ColAndIdxExistenceMap: unanalyzedMap,
},
priorityqueue.NewPartitionIDAndName("p1", 2): {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: statistics.AutoAnalyzeMinCnt + 1,
},
ColAndIdxExistenceMap: unanalyzedMap,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 1,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 1800 * time.Second,
wantPartitions: []string{"p0", "p1"},
},
{
name: "Test Table analyzed and only one partition meets the threshold",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[priorityqueue.PartitionIDAndName]*statistics.Table{
priorityqueue.NewPartitionIDAndName("p0", 1): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, (statistics.AutoAnalyzeMinCnt+1)*2, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
priorityqueue.NewPartitionIDAndName("p1", 2): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 2,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 24 * time.Hour,
wantPartitions: []string{"p0"},
},
{
name: "No partition meets the threshold",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[priorityqueue.PartitionIDAndName]*statistics.Table{
priorityqueue.NewPartitionIDAndName("p0", 1): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
priorityqueue.NewPartitionIDAndName("p1", 2): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 0,
wantAvgSize: 0,
wantAvgLastAnalyzeDuration: 0,
wantPartitions: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := priorityqueue.NewAnalysisJobFactory(nil, tt.autoAnalyzeRatio, tt.currentTs)
gotAvgChangePercentage,
gotAvgSize,
gotAvgLastAnalyzeDuration,
gotPartitions :=
factory.CalculateIndicatorsForPartitions(
tt.globalStats,
tt.partitionStats,
)
require.Equal(t, tt.wantAvgChangePercentage, gotAvgChangePercentage)
require.Equal(t, tt.wantAvgSize, gotAvgSize)
require.Equal(t, tt.wantAvgLastAnalyzeDuration, gotAvgLastAnalyzeDuration)
// Sort the partitions.
sort.Strings(tt.wantPartitions)
sort.Strings(gotPartitions)
require.Equal(t, tt.wantPartitions, gotPartitions)
})
}
}
func TestCheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(t *testing.T) {
tblInfo := model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
{
ID: 2,
Name: pmodel.NewCIStr("index2"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
}
partitionStats := map[priorityqueue.PartitionIDAndName]*statistics.Table{
priorityqueue.NewPartitionIDAndName("p0", 1): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, nil, map[int64]*statistics.Index{}),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 0),
},
priorityqueue.NewPartitionIDAndName("p1", 2): {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, nil, map[int64]*statistics.Index{
2: {
StatsVer: 2,
},
}),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 1),
},
}
factory := priorityqueue.NewAnalysisJobFactory(nil, 0, 0)
partitionIndexes := factory.CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, partitionStats)
expected := map[string][]string{"index1": {"p0", "p1"}, "index2": {"p0"}}
require.Equal(t, len(expected), len(partitionIndexes))
for k, v := range expected {
sort.Strings(v)
if val, ok := partitionIndexes[k]; ok {
sort.Strings(val)
require.Equal(t, v, val)
} else {
require.Fail(t, "key not found in partitionIndexes: "+k)
}
}
}
func TestAutoAnalysisTimeWindow(t *testing.T) {
tests := []struct {
name string
start time.Time
end time.Time
current time.Time
wantWithin bool
}{
{
name: "Within time window",
start: time.Date(2024, 1, 1, 1, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 1, 5, 0, 0, 0, time.UTC),
current: time.Date(2024, 1, 1, 3, 0, 0, 0, time.UTC),
wantWithin: true,
},
{
name: "Outside time window",
start: time.Date(2024, 1, 1, 1, 0, 0, 0, time.UTC),
end: time.Date(2024, 1, 1, 5, 0, 0, 0, time.UTC),
current: time.Date(2024, 1, 1, 6, 0, 0, 0, time.UTC),
wantWithin: false,
},
{
name: "Empty time window",
start: time.Time{},
end: time.Time{},
current: time.Now(),
wantWithin: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
window := priorityqueue.NewAutoAnalysisTimeWindow(tt.start, tt.end)
got := window.IsWithinTimeWindow(tt.current)
require.Equal(t, tt.wantWithin, got)
})
}
}

View File

@ -14,7 +14,6 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/autoanalyze/exec",
"//pkg/statistics/handle/autoanalyze/priorityqueue",
"//pkg/statistics/handle/lockstats",
@ -23,8 +22,6 @@ go_library(
"//pkg/statistics/handle/util",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/timeutil",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_zap//:zap",
],
)
@ -38,10 +35,9 @@ go_test(
"worker_test.go",
],
flaky = True,
shard_count = 15,
shard_count = 9,
deps = [
":refresher",
"//pkg/meta/model",
"//pkg/parser/model",
"//pkg/sessionctx",
"//pkg/sessionctx/sysproctrack",
@ -51,7 +47,6 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/testsetup",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@org_uber_go_goleak//:goleak",
],
)

View File

@ -23,7 +23,6 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
@ -32,25 +31,16 @@ import (
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
const (
// unanalyzedTableDefaultChangePercentage is the default change percentage of unanalyzed table.
unanalyzedTableDefaultChangePercentage = 1
// unanalyzedTableDefaultLastUpdateDuration is the default last update duration of unanalyzed table.
unanalyzedTableDefaultLastUpdateDuration = -30 * time.Minute
)
// Refresher provides methods to refresh stats info.
// NOTE: Refresher is not thread-safe.
type Refresher struct {
statsHandle statstypes.StatsHandle
sysProcTracker sysproctrack.Tracker
// This will be refreshed every time we rebuild the priority queue.
autoAnalysisTimeWindow
autoAnalysisTimeWindow priorityqueue.AutoAnalysisTimeWindow
// Jobs is the priority queue of analysis jobs.
// Exported for testing purposes.
@ -84,7 +74,7 @@ func (r *Refresher) UpdateConcurrency() {
// AnalyzeHighestPriorityTables picks tables with the highest priority and analyzes them.
func (r *Refresher) AnalyzeHighestPriorityTables() bool {
if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return false
}
@ -182,11 +172,8 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
}
// We will check it again when we try to execute the job.
// So store the time window for later use.
r.autoAnalysisTimeWindow = autoAnalysisTimeWindow{
start: start,
end: end,
}
if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) {
r.autoAnalysisTimeWindow = priorityqueue.NewAutoAnalysisTimeWindow(start, end)
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
calculator := priorityqueue.NewPriorityCalculator()
@ -204,11 +191,13 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
return err
}
jobFactory := priorityqueue.NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !r.autoAnalysisTimeWindow.isWithinTimeWindow(time.Now()) {
if !r.autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}
// Ignore the memory and system database.
@ -231,37 +220,13 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
continue
}
pi := tblInfo.GetPartitionInfo()
pushJobFunc := func(job priorityqueue.AnalysisJob) {
if job == nil {
return
}
// Calculate the weight of the job.
weight := calculator.CalculateWeight(job)
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
r.Jobs.Push(job)
}
// No partitions, analyze the whole table.
if pi == nil {
job := CreateTableAnalysisJob(
sctx,
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
autoAnalyzeRatio,
currentTs,
)
pushJobFunc(job)
// Skip the rest of the loop.
r.pushJob(job, calculator)
continue
}
@ -272,33 +237,27 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := getPartitionStats(r.statsHandle, tblInfo, partitionDefs)
partitionStats := priorityqueue.GetPartitionStats(r.statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for pIDAndName, stats := range partitionStats {
job := CreateStaticPartitionAnalysisJob(
sctx,
job := jobFactory.CreateStaticPartitionAnalysisJob(
db.O,
tblInfo,
pIDAndName.ID,
pIDAndName.Name,
stats,
autoAnalyzeRatio,
currentTs,
)
pushJobFunc(job)
r.pushJob(job, calculator)
}
} else {
job := createTableAnalysisJobForPartitions(
sctx,
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
db.O,
tblInfo,
r.statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
partitionStats,
autoAnalyzeRatio,
currentTs,
)
pushJobFunc(job)
r.pushJob(job, calculator)
}
}
}
@ -313,6 +272,25 @@ func (r *Refresher) RebuildTableAnalysisJobQueue() error {
return nil
}
func (r *Refresher) pushJob(job priorityqueue.AnalysisJob, calculator *priorityqueue.PriorityCalculator) {
if job == nil {
return
}
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
weight := calculator.CalculateWeight(job)
if weight <= 0 {
statslogutil.SingletonStatsSamplerLogger().Warn(
"Table gets a negative weight",
zap.Float64("weight", weight),
zap.Stringer("job", job),
)
}
job.SetWeight(weight)
// Push the job onto the queue.
r.Jobs.Push(job)
}
// WaitAutoAnalyzeFinishedForTest waits for the auto analyze job to be finished.
// Only used in the test.
func (r *Refresher) WaitAutoAnalyzeFinishedForTest() {
@ -330,314 +308,6 @@ func (r *Refresher) Close() {
r.worker.Stop()
}
// CreateTableAnalysisJob creates a TableAnalysisJob for the physical table.
func CreateTableAnalysisJob(
sctx sessionctx.Context,
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) priorityqueue.AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
return nil
}
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)
changePercentage := CalculateChangePercentage(tblStats, autoAnalyzeRatio)
tableSize := calculateTableSize(tblStats)
lastAnalysisDuration := GetTableLastAnalyzeDuration(tblStats, currentTs)
indexes := CheckIndexesNeedAnalyze(tblInfo, tblStats)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}
job := priorityqueue.NewNonPartitionedTableAnalysisJob(
tableSchema,
tblInfo.Name.O,
tblInfo.ID,
indexes,
tableStatsVer,
changePercentage,
tableSize,
lastAnalysisDuration,
)
return job
}
// CreateStaticPartitionAnalysisJob creates a TableAnalysisJob for the static partition.
func CreateStaticPartitionAnalysisJob(
sctx sessionctx.Context,
tableSchema string,
globalTblInfo *model.TableInfo,
partitionID int64,
partitionName string,
partitionStats *statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) priorityqueue.AnalysisJob {
if !partitionStats.IsEligibleForAnalysis() {
return nil
}
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(partitionStats, &tableStatsVer)
changePercentage := CalculateChangePercentage(partitionStats, autoAnalyzeRatio)
tableSize := calculateTableSize(partitionStats)
lastAnalysisDuration := GetTableLastAnalyzeDuration(partitionStats, currentTs)
indexes := CheckIndexesNeedAnalyze(globalTblInfo, partitionStats)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}
job := priorityqueue.NewStaticPartitionTableAnalysisJob(
tableSchema,
globalTblInfo.Name.O,
globalTblInfo.ID,
partitionName,
partitionID,
indexes,
tableStatsVer,
changePercentage,
tableSize,
lastAnalysisDuration,
)
return job
}
// CalculateChangePercentage calculates the change percentage of the table
// based on the change count and the analysis count.
func CalculateChangePercentage(
tblStats *statistics.Table,
autoAnalyzeRatio float64,
) float64 {
if !tblStats.IsAnalyzed() {
return unanalyzedTableDefaultChangePercentage
}
// Auto analyze based on the change percentage is disabled.
// However, this check should not affect the analysis of indexes,
// as index analysis is still needed for query performance.
if autoAnalyzeRatio == 0 {
return 0
}
tblCnt := float64(tblStats.RealtimeCount)
if histCnt := tblStats.GetAnalyzeRowCount(); histCnt > 0 {
tblCnt = histCnt
}
res := float64(tblStats.ModifyCount) / tblCnt
if res > autoAnalyzeRatio {
return res
}
return 0
}
func calculateTableSize(
tblStats *statistics.Table,
) float64 {
tblCnt := float64(tblStats.RealtimeCount)
colCnt := float64(tblStats.ColAndIdxExistenceMap.ColNum())
intest.Assert(colCnt != 0, "Column count should not be 0")
return tblCnt * colCnt
}
// GetTableLastAnalyzeDuration gets the duration since the last analysis of the table.
func GetTableLastAnalyzeDuration(
tblStats *statistics.Table,
currentTs uint64,
) time.Duration {
lastTime := findLastAnalyzeTime(tblStats, currentTs)
currentTime := oracle.GetTimeFromTS(currentTs)
// Calculate the duration since last analyze.
return currentTime.Sub(lastTime)
}
// findLastAnalyzeTime finds the last analyze time of the table.
// It uses `LastUpdateVersion` to find the last analyze time.
// The `LastUpdateVersion` is the version of the transaction that updates the statistics.
// It always not null(default 0), so we can use it to find the last analyze time.
func findLastAnalyzeTime(
tblStats *statistics.Table,
currentTs uint64,
) time.Time {
// Table is not analyzed, compose a fake version.
if !tblStats.IsAnalyzed() {
phy := oracle.GetTimeFromTS(currentTs)
return phy.Add(unanalyzedTableDefaultLastUpdateDuration)
}
return oracle.GetTimeFromTS(tblStats.LastAnalyzeVersion)
}
// CheckIndexesNeedAnalyze checks if the indexes of the table need to be analyzed.
func CheckIndexesNeedAnalyze(
tblInfo *model.TableInfo,
tblStats *statistics.Table,
) []string {
// If table is not analyzed, we need to analyze whole table.
// So we don't need to check indexes.
if !tblStats.IsAnalyzed() {
return nil
}
indexes := make([]string, 0, len(tblInfo.Indices))
// Check if missing index stats.
for _, idx := range tblInfo.Indices {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) && idx.State == model.StatePublic {
indexes = append(indexes, idx.Name.O)
}
}
return indexes
}
func createTableAnalysisJobForPartitions(
sctx sessionctx.Context,
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) priorityqueue.AnalysisJob {
if !tblStats.IsEligibleForAnalysis() {
return nil
}
// TODO: figure out how to check the table stats version correctly for partitioned tables.
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)
averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := CalculateIndicatorsForPartitions(
tblStats,
partitionStats,
autoAnalyzeRatio,
currentTs,
)
partitionIndexes := CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo,
partitionStats,
)
// No need to analyze.
// We perform a separate check because users may set the auto analyze ratio to 0,
// yet still wish to analyze newly added indexes and tables that have not been analyzed.
if len(partitionNames) == 0 && len(partitionIndexes) == 0 {
return nil
}
job := priorityqueue.NewDynamicPartitionedTableAnalysisJob(
tableSchema,
tblInfo.Name.O,
tblInfo.ID,
partitionNames,
partitionIndexes,
tableStatsVer,
averageChangePercentage,
avgSize,
minLastAnalyzeDuration,
)
return job
}
// CalculateIndicatorsForPartitions calculates the average change percentage,
// average size and average last analyze duration for the partitions that meet the threshold.
// Change percentage is the ratio of the number of modified rows to the total number of rows.
// Size is the product of the number of rows and the number of columns.
// Last analyze duration is the duration since the last analyze.
func CalculateIndicatorsForPartitions(
globalStats *statistics.Table,
partitionStats map[PartitionIDAndName]*statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) (
avgChange float64,
avgSize float64,
avgLastAnalyzeDuration time.Duration,
partitionNames []string,
) {
totalChangePercent := 0.0
totalSize := 0.0
count := 0.0
partitionNames = make([]string, 0, len(partitionStats))
cols := float64(globalStats.ColAndIdxExistenceMap.ColNum())
intest.Assert(cols != 0, "Column count should not be 0")
totalLastAnalyzeDuration := time.Duration(0)
for pIDAndName, tblStats := range partitionStats {
changePercent := CalculateChangePercentage(tblStats, autoAnalyzeRatio)
// Skip partition analysis if it doesn't meet the threshold, stats are not yet loaded,
// or the auto analyze ratio is set to 0 by the user.
if changePercent == 0 {
continue
}
totalChangePercent += changePercent
// size = count * cols
totalSize += float64(tblStats.RealtimeCount) * cols
lastAnalyzeDuration := GetTableLastAnalyzeDuration(tblStats, currentTs)
totalLastAnalyzeDuration += lastAnalyzeDuration
partitionNames = append(partitionNames, pIDAndName.Name)
count++
}
if len(partitionNames) == 0 {
return 0, 0, 0, partitionNames
}
avgChange = totalChangePercent / count
avgSize = totalSize / count
avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count)
return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames
}
// CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed.
// It returns a map from index name to the names of the partitions that need to be analyzed.
// NOTE: This is only for newly added indexes.
func CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo *model.TableInfo,
partitionStats map[PartitionIDAndName]*statistics.Table,
) map[string][]string {
partitionIndexes := make(map[string][]string, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
// No need to analyze the index if it's not public.
if idx.State != model.StatePublic {
continue
}
// Find all the partitions that need to analyze this index.
names := make([]string, 0, len(partitionStats))
for pIDAndName, tblStats := range partitionStats {
if idxStats := tblStats.GetIdx(idx.ID); idxStats == nil && !tblStats.ColAndIdxExistenceMap.HasAnalyzed(idx.ID, true) {
names = append(names, pIDAndName.Name)
}
}
if len(names) > 0 {
partitionIndexes[idx.Name.O] = names
}
}
return partitionIndexes
}
func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
@ -645,48 +315,3 @@ func getStartTs(sctx sessionctx.Context) (uint64, error) {
}
return txn.StartTS(), nil
}
// PartitionIDAndName is a struct that contains the ID and name of a partition.
// Exported for testing purposes. Do not use it in other packages.
type PartitionIDAndName struct {
Name string
ID int64
}
func getPartitionStats(
statsHandle statstypes.StatsHandle,
tblInfo *model.TableInfo,
defs []model.PartitionDefinition,
) map[PartitionIDAndName]*statistics.Table {
partitionStats := make(map[PartitionIDAndName]*statistics.Table, len(defs))
for _, def := range defs {
stats := statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, def.ID)
// Ignore the partition if it's not ready to analyze.
if !stats.IsEligibleForAnalysis() {
continue
}
d := PartitionIDAndName{
ID: def.ID,
Name: def.Name.O,
}
partitionStats[d] = stats
}
return partitionStats
}
// autoAnalysisTimeWindow is a struct that contains the start and end time of the auto analyze time window.
type autoAnalysisTimeWindow struct {
start time.Time
end time.Time
}
// isWithinTimeWindow checks if the current time is within the time window.
// If the auto analyze time window is not set or the current time is not in the window, return false.
func (a autoAnalysisTimeWindow) isWithinTimeWindow(currentTime time.Time) bool {
if a.start == (time.Time{}) || a.end == (time.Time{}) {
return false
}
return timeutil.WithinDayTimePeriod(a.start, a.end, currentTime)
}

View File

@ -17,18 +17,15 @@ package refresher_test
import (
"context"
"math"
"sort"
"testing"
"time"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)
func TestSkipAnalyzeTableWhenAutoAnalyzeRatioIsZero(t *testing.T) {
@ -440,450 +437,3 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) {
require.Equal(t, float64(6*2), indicators.TableSize)
require.GreaterOrEqual(t, indicators.LastAnalysisDuration, time.Duration(0))
}
func TestCalculateChangePercentage(t *testing.T) {
unanalyzedColumns := map[int64]*statistics.Column{
1: {},
2: {},
}
unanalyzedIndices := map[int64]*statistics.Index{
1: {},
2: {},
}
analyzedColumns := map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
}
analyzedIndices := map[int64]*statistics.Index{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
}
bothUnanalyzedMap := statistics.NewColAndIndexExistenceMap(0, 0)
bothAnalyzedMap := statistics.NewColAndIndexExistenceMap(2, 2)
bothAnalyzedMap.InsertCol(1, nil, true)
bothAnalyzedMap.InsertCol(2, nil, true)
bothAnalyzedMap.InsertIndex(1, nil, true)
bothAnalyzedMap.InsertIndex(2, nil, true)
tests := []struct {
name string
tblStats *statistics.Table
autoAnalyzeRatio float64
want float64
}{
{
name: "Test Table not analyzed",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, unanalyzedColumns, unanalyzedIndices),
ColAndIdxExistenceMap: bothUnanalyzedMap,
},
autoAnalyzeRatio: 0.5,
want: 1,
},
{
name: "Based on change percentage",
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, (statistics.AutoAnalyzeMinCnt+1)*2, analyzedColumns, analyzedIndices),
ColAndIdxExistenceMap: bothAnalyzedMap,
LastAnalyzeVersion: 1,
},
autoAnalyzeRatio: 0.5,
want: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := refresher.CalculateChangePercentage(tt.tblStats, tt.autoAnalyzeRatio)
require.Equal(t, tt.want, got)
})
}
}
func TestGetTableLastAnalyzeDuration(t *testing.T) {
// 2023-12-31 10:00:00
lastUpdateTime := time.Date(2023, 12, 31, 10, 0, 0, 0, time.UTC)
lastUpdateTs := oracle.GoTimeToTS(lastUpdateTime)
tblStats := &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 0, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
LastAnalyzeVersion: lastUpdateTs,
}
// 2024-01-01 10:00:00
currentTime := time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC)
currentTs := oracle.GoTimeToTS(currentTime)
want := 24 * time.Hour
got := refresher.GetTableLastAnalyzeDuration(tblStats, currentTs)
require.Equal(t, want, got)
}
func TestGetTableLastAnalyzeDurationForUnanalyzedTable(t *testing.T) {
tblStats := &statistics.Table{
HistColl: statistics.HistColl{},
}
// 2024-01-01 10:00:00
currentTime := time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC)
currentTs := oracle.GoTimeToTS(currentTime)
want := 1800 * time.Second
got := refresher.GetTableLastAnalyzeDuration(tblStats, currentTs)
require.Equal(t, want, got)
}
func TestCheckIndexesNeedAnalyze(t *testing.T) {
analyzedMap := statistics.NewColAndIndexExistenceMap(1, 0)
analyzedMap.InsertCol(1, nil, true)
analyzedMap.InsertIndex(1, nil, false)
tests := []struct {
name string
tblInfo *model.TableInfo
tblStats *statistics.Table
want []string
}{
{
name: "Test Table not analyzed",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
},
},
tblStats: &statistics.Table{ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 0)},
want: nil,
},
{
name: "Test Index not analyzed",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
},
},
tblStats: &statistics.Table{
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, 0, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
}, map[int64]*statistics.Index{}),
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: 1,
},
want: []string{"index1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := refresher.CheckIndexesNeedAnalyze(tt.tblInfo, tt.tblStats)
require.Equal(t, tt.want, got)
})
}
}
func TestCalculateIndicatorsForPartitions(t *testing.T) {
// 2024-01-01 10:00:00
currentTime := time.Date(2024, 1, 1, 10, 0, 0, 0, time.UTC)
currentTs := oracle.GoTimeToTS(currentTime)
// 2023-12-31 10:00:00
lastUpdateTime := time.Date(2023, 12, 31, 10, 0, 0, 0, time.UTC)
lastUpdateTs := oracle.GoTimeToTS(lastUpdateTime)
unanalyzedMap := statistics.NewColAndIndexExistenceMap(0, 0)
analyzedMap := statistics.NewColAndIndexExistenceMap(2, 1)
analyzedMap.InsertCol(1, nil, true)
analyzedMap.InsertCol(2, nil, true)
analyzedMap.InsertIndex(1, nil, true)
tests := []struct {
name string
globalStats *statistics.Table
partitionStats map[refresher.PartitionIDAndName]*statistics.Table
defs []model.PartitionDefinition
autoAnalyzeRatio float64
currentTs uint64
wantAvgChangePercentage float64
wantAvgSize float64
wantAvgLastAnalyzeDuration time.Duration
wantPartitions []string
}{
{
name: "Test Table not analyzed",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[refresher.PartitionIDAndName]*statistics.Table{
{
ID: 1,
Name: "p0",
}: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: statistics.AutoAnalyzeMinCnt + 1,
},
ColAndIdxExistenceMap: unanalyzedMap,
},
{
ID: 2,
Name: "p1",
}: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: statistics.AutoAnalyzeMinCnt + 1,
},
ColAndIdxExistenceMap: unanalyzedMap,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 1,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 1800 * time.Second,
wantPartitions: []string{"p0", "p1"},
},
{
name: "Test Table analyzed and only one partition meets the threshold",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[refresher.PartitionIDAndName]*statistics.Table{
{
ID: 1,
Name: "p0",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, (statistics.AutoAnalyzeMinCnt+1)*2, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
{
ID: 2,
Name: "p1",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 2,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 24 * time.Hour,
wantPartitions: []string{"p0"},
},
{
name: "No partition meets the threshold",
globalStats: &statistics.Table{
ColAndIdxExistenceMap: analyzedMap,
},
partitionStats: map[refresher.PartitionIDAndName]*statistics.Table{
{
ID: 1,
Name: "p0",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
{
ID: 2,
Name: "p1",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, map[int64]*statistics.Column{
1: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
2: {
StatsVer: 2,
Histogram: statistics.Histogram{
LastUpdateVersion: lastUpdateTs,
},
},
}, nil),
Version: currentTs,
ColAndIdxExistenceMap: analyzedMap,
LastAnalyzeVersion: lastUpdateTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: pmodel.NewCIStr("p0"),
},
{
ID: 2,
Name: pmodel.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 0,
wantAvgSize: 0,
wantAvgLastAnalyzeDuration: 0,
wantPartitions: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAvgChangePercentage,
gotAvgSize,
gotAvgLastAnalyzeDuration,
gotPartitions :=
refresher.CalculateIndicatorsForPartitions(
tt.globalStats,
tt.partitionStats,
tt.autoAnalyzeRatio,
tt.currentTs,
)
require.Equal(t, tt.wantAvgChangePercentage, gotAvgChangePercentage)
require.Equal(t, tt.wantAvgSize, gotAvgSize)
require.Equal(t, tt.wantAvgLastAnalyzeDuration, gotAvgLastAnalyzeDuration)
// Sort the partitions.
sort.Strings(tt.wantPartitions)
sort.Strings(gotPartitions)
require.Equal(t, tt.wantPartitions, gotPartitions)
})
}
}
func TestCheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(t *testing.T) {
tblInfo := model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: pmodel.NewCIStr("index1"),
State: model.StatePublic,
},
{
ID: 2,
Name: pmodel.NewCIStr("index2"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
}
partitionStats := map[refresher.PartitionIDAndName]*statistics.Table{
{
ID: 1,
Name: "p0",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, nil, map[int64]*statistics.Index{}),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 0),
},
{
ID: 2,
Name: "p1",
}: {
HistColl: *statistics.NewHistCollWithColsAndIdxs(0, false, statistics.AutoAnalyzeMinCnt+1, 0, nil, map[int64]*statistics.Index{
2: {
StatsVer: 2,
},
}),
ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(0, 1),
},
}
partitionIndexes := refresher.CheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, partitionStats)
expected := map[string][]string{"index1": {"p0", "p1"}, "index2": {"p0"}}
require.Equal(t, len(expected), len(partitionIndexes))
for k, v := range expected {
sort.Strings(v)
if val, ok := partitionIndexes[k]; ok {
sort.Strings(val)
require.Equal(t, v, val)
} else {
require.Fail(t, "key not found in partitionIndexes: "+k)
}
}
}