statistics: build the table analysis job queue part2 (#51152)

ref pingcap/tidb#50132
This commit is contained in:
二手掉包工程师
2024-02-26 19:46:59 +08:00
committed by GitHub
parent c1b70d56de
commit f4e758a89d
4 changed files with 503 additions and 10 deletions

View File

@ -44,12 +44,19 @@ const defaultFailedAnalysisWaitTime = 30 * time.Minute
// TableAnalysisJob defines the structure for table analysis job information.
type TableAnalysisJob struct {
// Only set when partitions's indexes need to be analyzed.
// It looks like: {"indexName": ["partitionName1", "partitionName2"]}
// This is only for newly added indexes.
// The reason why we need to record the partition names is that we need to analyze partitions in batch mode
// and we don't want to analyze the same partition multiple times.
// For example, the user may analyze some partitions manually, and we don't want to analyze them again.
PartitionIndexes map[string][]string
TableSchema string
TableName string
// Only set when table's indexes need to be analyzed.
// This is only for newly added indexes.
Indexes []string
// Only set when table's partitions need to be analyzed.
// This will analyze all indexes and columns of the specified partitions.
Partitions []string
TableID int64
TableStatsVer int

View File

@ -29,7 +29,7 @@ go_test(
srcs = ["refresher_test.go"],
embed = [":refresher"],
flaky = True,
shard_count = 6,
shard_count = 8,
deps = [
"//pkg/parser/model",
"//pkg/statistics",

View File

@ -35,6 +35,7 @@ import (
)
// Refresher provides methods to refresh stats info.
// NOTE: Refresher is not thread-safe.
type Refresher struct {
statsHandle statstypes.StatsHandle
sysProcTracker sessionctx.SysProcTracker
@ -110,6 +111,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
calculator := priorityqueue.NewPriorityCalculator(autoAnalyzeRatio)
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
@ -155,7 +157,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
// Push the job onto the queue.
r.jobs.Push(job)
}
// No partitions or prune mode is static, analyze the whole table.
// No partitions, analyze the whole table.
if pi == nil {
job := createTableAnalysisJob(
sctx,
@ -166,6 +168,43 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
currentTs,
)
pushJobFunc(job)
// Skip the rest of the loop.
continue
}
// Only analyze the partition that has not been locked.
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
for _, def := range pi.Definitions {
if _, ok := lockedTables[def.ID]; !ok {
partitionDefs = append(partitionDefs, def)
}
}
partitionStats := getPartitionStats(r.statsHandle, tblInfo, partitionDefs)
// If the prune mode is static, we need to analyze every partition as a separate table.
if pruneMode == variable.Static {
for _, def := range pi.Definitions {
job := createTableAnalysisJob(
sctx,
db,
tblInfo,
partitionStats[def.ID],
autoAnalyzeRatio,
currentTs,
)
pushJobFunc(job)
}
} else {
job := createTableAnalysisJobForPartitions(
sctx,
db,
tblInfo,
r.statsHandle.GetPartitionStats(tblInfo, tblInfo.ID),
partitionDefs,
partitionStats,
autoAnalyzeRatio,
currentTs,
)
pushJobFunc(job)
}
}
}
@ -196,6 +235,11 @@ func createTableAnalysisJob(
lastAnalysisDuration := getTableLastAnalyzeDuration(tblStats, currentTs)
indexes := checkIndexesNeedAnalyze(tblInfo, tblStats)
// No need to analyze.
if changePercentage == 0 && len(indexes) == 0 {
return nil
}
job := &priorityqueue.TableAnalysisJob{
TableID: tblInfo.ID,
TableSchema: tableSchema,
@ -281,6 +325,136 @@ func checkIndexesNeedAnalyze(
return indexes
}
func createTableAnalysisJobForPartitions(
sctx sessionctx.Context,
tableSchema string,
tblInfo *model.TableInfo,
tblStats *statistics.Table,
defs []model.PartitionDefinition,
partitionStats map[int64]*statistics.Table,
autoAnalyzeRatio float64,
currentTs uint64,
) *priorityqueue.TableAnalysisJob {
// TODO: figure out how to check the table stats version correctly for partitioned tables.
tableStatsVer := sctx.GetSessionVars().AnalyzeVersion
statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer)
averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := calculateIndicatorsForPartitions(
tblInfo,
partitionStats,
defs,
autoAnalyzeRatio,
currentTs,
)
partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo,
defs,
partitionStats,
)
// No need to analyze.
if len(partitionNames) == 0 && len(partitionIndexes) == 0 {
return nil
}
job := &priorityqueue.TableAnalysisJob{
TableID: tblInfo.ID,
TableSchema: tableSchema,
TableName: tblInfo.Name.O,
TableStatsVer: tableStatsVer,
ChangePercentage: averageChangePercentage,
TableSize: avgSize,
LastAnalysisDuration: minLastAnalyzeDuration,
Partitions: partitionNames,
PartitionIndexes: partitionIndexes,
}
return job
}
// calculateIndicatorsForPartitions calculates the average change percentage,
// average size and average last analyze duration for the partitions that meet the threshold.
// Change percentage is the ratio of the number of modified rows to the total number of rows.
// Size is the product of the number of rows and the number of columns.
// Last analyze duration is the duration since the last analyze.
func calculateIndicatorsForPartitions(
tblInfo *model.TableInfo,
partitionStats map[int64]*statistics.Table,
defs []model.PartitionDefinition,
autoAnalyzeRatio float64,
currentTs uint64,
) (
avgChange float64,
avgSize float64,
avgLastAnalyzeDuration time.Duration,
partitionNames []string,
) {
totalChangePercent := 0.0
totalSize := 0.0
count := 0.0
partitionNames = make([]string, 0, len(defs))
cols := float64(len(tblInfo.Columns))
totalLastAnalyzeDuration := time.Duration(0)
for _, def := range defs {
tblStats := partitionStats[def.ID]
changePercent := calculateChangePercentage(tblStats, autoAnalyzeRatio)
// No need to analyze the partition because it doesn't meet the threshold or stats are not loaded yet.
if changePercent == 0 {
continue
}
totalChangePercent += changePercent
// size = count * cols
totalSize += float64(tblStats.RealtimeCount) * cols
lastAnalyzeDuration := getTableLastAnalyzeDuration(tblStats, currentTs)
totalLastAnalyzeDuration += lastAnalyzeDuration
partitionNames = append(partitionNames, def.Name.O)
count++
}
if len(partitionNames) == 0 {
return 0, 0, 0, partitionNames
}
avgChange = totalChangePercent / count
avgSize = totalSize / count
avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count)
return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames
}
// checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed.
// It returns a map from index name to the names of the partitions that need to be analyzed.
// NOTE: This is only for newly added indexes.
func checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable(
tblInfo *model.TableInfo,
defs []model.PartitionDefinition,
partitionStats map[int64]*statistics.Table,
) map[string][]string {
partitionIndexes := make(map[string][]string, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
// No need to analyze the index if it's not public.
if idx.State != model.StatePublic {
continue
}
// Find all the partitions that need to analyze this index.
names := make([]string, 0, len(defs))
for _, def := range defs {
tblStats := partitionStats[def.ID]
if _, ok := tblStats.Indices[idx.ID]; !ok {
names = append(names, def.Name.O)
}
}
if len(names) > 0 {
partitionIndexes[idx.Name.O] = names
}
}
return partitionIndexes
}
func getStartTs(sctx sessionctx.Context) (uint64, error) {
txn, err := sctx.Txn(true)
if err != nil {
@ -288,3 +462,18 @@ func getStartTs(sctx sessionctx.Context) (uint64, error) {
}
return txn.StartTS(), nil
}
func getPartitionStats(
statsHandle statstypes.StatsHandle,
tblInfo *model.TableInfo,
defs []model.PartitionDefinition,
) map[int64]*statistics.Table {
partitionStats := make(map[int64]*statistics.Table, len(defs))
for _, def := range defs {
// TODO: use GetPartitionStatsForAutoAnalyze to save memory.
partitionStats[def.ID] = statsHandle.GetPartitionStats(tblInfo, def.ID)
}
return partitionStats
}

View File

@ -15,6 +15,7 @@
package refresher
import (
"sort"
"testing"
"time"
@ -200,15 +201,10 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) {
r, err := NewRefresher(handle, sysProcTracker)
require.NoError(t, err)
// Rebuild the job queue.
// Rebuild the job queue. No jobs are added.
err = r.rebuildTableAnalysisJobQueue()
require.NoError(t, err)
require.Equal(t, 1, r.jobs.Len())
job1 := r.jobs.Pop()
require.Equal(t, float64(1), job1.Weight)
require.Equal(t, float64(0), job1.ChangePercentage)
require.Equal(t, float64(3*2), job1.TableSize)
require.GreaterOrEqual(t, job1.LastAnalysisDuration, time.Duration(0))
require.Equal(t, 0, r.jobs.Len())
// Insert more data into t1.
tk.MustExec("insert into t1 values (4, 4), (5, 5), (6, 6)")
require.Nil(t, handle.DumpStatsDeltaToKV(true))
@ -216,7 +212,7 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) {
err = r.rebuildTableAnalysisJobQueue()
require.NoError(t, err)
require.Equal(t, 1, r.jobs.Len())
job1 = r.jobs.Pop()
job1 := r.jobs.Pop()
require.Equal(t, float64(1), job1.Weight)
require.Equal(t, float64(1), job1.ChangePercentage)
require.Equal(t, float64(6*2), job1.TableSize)
@ -376,3 +372,304 @@ func TestCheckIndexesNeedAnalyze(t *testing.T) {
})
}
}
func TestCalculateIndicatorsForPartitions(t *testing.T) {
currentTs := oracle.ComposeTS((time.Hour.Nanoseconds()+time.Second.Nanoseconds())*1000, 0)
tests := []struct {
name string
tblInfo *model.TableInfo
partitionStats map[int64]*statistics.Table
defs []model.PartitionDefinition
autoAnalyzeRatio float64
currentTs uint64
wantAvgChangePercentage float64
wantAvgSize float64
wantAvgLastAnalyzeDuration time.Duration
wantPartitions []string
}{
{
name: "Test Table not analyzed",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: model.NewCIStr("index1"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
},
partitionStats: map[int64]*statistics.Table{
1: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
},
Version: currentTs,
},
2: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
},
Version: currentTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: model.NewCIStr("p0"),
},
{
ID: 2,
Name: model.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 1,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 0,
wantPartitions: []string{"p0", "p1"},
},
{
name: "Test Table analyzed and only one partition meets the threshold",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: model.NewCIStr("index1"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
},
partitionStats: map[int64]*statistics.Table{
1: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: (exec.AutoAnalyzeMinCnt + 1) * 2,
Columns: map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
},
},
Version: currentTs,
},
2: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: 0,
Columns: map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
},
},
Version: currentTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: model.NewCIStr("p0"),
},
{
ID: 2,
Name: model.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 2,
wantAvgSize: 2002,
wantAvgLastAnalyzeDuration: 0,
wantPartitions: []string{"p0"},
},
{
name: "No partition meets the threshold",
tblInfo: &model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: model.NewCIStr("index1"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
},
partitionStats: map[int64]*statistics.Table{
1: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: 0,
Columns: map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
},
},
Version: currentTs,
},
2: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: 0,
Columns: map[int64]*statistics.Column{
1: {
StatsVer: 2,
},
2: {
StatsVer: 2,
},
},
},
Version: currentTs,
},
},
defs: []model.PartitionDefinition{
{
ID: 1,
Name: model.NewCIStr("p0"),
},
{
ID: 2,
Name: model.NewCIStr("p1"),
},
},
autoAnalyzeRatio: 0.5,
currentTs: currentTs,
wantAvgChangePercentage: 0,
wantAvgSize: 0,
wantAvgLastAnalyzeDuration: 0,
wantPartitions: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAvgChangePercentage,
gotAvgSize,
gotAvgLastAnalyzeDuration,
gotPartitions :=
calculateIndicatorsForPartitions(
tt.tblInfo,
tt.partitionStats,
tt.defs,
tt.autoAnalyzeRatio,
tt.currentTs,
)
require.Equal(t, tt.wantAvgChangePercentage, gotAvgChangePercentage)
require.Equal(t, tt.wantAvgSize, gotAvgSize)
require.Equal(t, tt.wantAvgLastAnalyzeDuration, gotAvgLastAnalyzeDuration)
require.Equal(t, tt.wantPartitions, gotPartitions)
})
}
}
func TestCheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(t *testing.T) {
tblInfo := model.TableInfo{
Indices: []*model.IndexInfo{
{
ID: 1,
Name: model.NewCIStr("index1"),
State: model.StatePublic,
},
{
ID: 2,
Name: model.NewCIStr("index2"),
State: model.StatePublic,
},
},
Columns: []*model.ColumnInfo{
{
ID: 1,
},
{
ID: 2,
},
},
}
partitionStats := map[int64]*statistics.Table{
1: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: 0,
Indices: map[int64]*statistics.Index{},
},
},
2: {
HistColl: statistics.HistColl{
Pseudo: false,
RealtimeCount: exec.AutoAnalyzeMinCnt + 1,
ModifyCount: 0,
Indices: map[int64]*statistics.Index{
2: {
StatsVer: 2,
},
},
},
},
}
defs := []model.PartitionDefinition{
{
ID: 1,
Name: model.NewCIStr("p0"),
},
{
ID: 2,
Name: model.NewCIStr("p1"),
},
}
partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, defs, partitionStats)
expected := map[string][]string{"index1": {"p0", "p1"}, "index2": {"p0"}}
require.Equal(t, len(expected), len(partitionIndexes))
for k, v := range expected {
sort.Strings(v)
if val, ok := partitionIndexes[k]; ok {
sort.Strings(val)
require.Equal(t, v, val)
} else {
require.Fail(t, "key not found in partitionIndexes: "+k)
}
}
}