diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go index 31e3a2f938..f432da863d 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/job.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/job.go @@ -44,12 +44,19 @@ const defaultFailedAnalysisWaitTime = 30 * time.Minute // TableAnalysisJob defines the structure for table analysis job information. type TableAnalysisJob struct { // Only set when partitions's indexes need to be analyzed. + // It looks like: {"indexName": ["partitionName1", "partitionName2"]} + // This is only for newly added indexes. + // The reason why we need to record the partition names is that we need to analyze partitions in batch mode + // and we don't want to analyze the same partition multiple times. + // For example, the user may analyze some partitions manually, and we don't want to analyze them again. PartitionIndexes map[string][]string TableSchema string TableName string // Only set when table's indexes need to be analyzed. + // This is only for newly added indexes. Indexes []string // Only set when table's partitions need to be analyzed. + // This will analyze all indexes and columns of the specified partitions. Partitions []string TableID int64 TableStatsVer int diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index e1112d37bd..25f4ce38f4 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -29,7 +29,7 @@ go_test( srcs = ["refresher_test.go"], embed = [":refresher"], flaky = True, - shard_count = 6, + shard_count = 8, deps = [ "//pkg/parser/model", "//pkg/statistics", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher.go b/pkg/statistics/handle/autoanalyze/refresher/refresher.go index 32b9fb3b02..e210eb75e1 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher.go @@ -35,6 +35,7 @@ import ( ) // Refresher provides methods to refresh stats info. +// NOTE: Refresher is not thread-safe. type Refresher struct { statsHandle statstypes.StatsHandle sysProcTracker sessionctx.SysProcTracker @@ -110,6 +111,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { parameters := exec.GetAutoAnalyzeParameters(sctx) autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio]) calculator := priorityqueue.NewPriorityCalculator(autoAnalyzeRatio) + pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load()) is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) // Query locked tables once to minimize overhead. // Outdated lock info is acceptable as we verify table lock status pre-analysis. @@ -155,7 +157,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { // Push the job onto the queue. r.jobs.Push(job) } - // No partitions or prune mode is static, analyze the whole table. + // No partitions, analyze the whole table. if pi == nil { job := createTableAnalysisJob( sctx, @@ -166,6 +168,43 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error { currentTs, ) pushJobFunc(job) + // Skip the rest of the loop. + continue + } + + // Only analyze the partition that has not been locked. + partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + if _, ok := lockedTables[def.ID]; !ok { + partitionDefs = append(partitionDefs, def) + } + } + partitionStats := getPartitionStats(r.statsHandle, tblInfo, partitionDefs) + // If the prune mode is static, we need to analyze every partition as a separate table. + if pruneMode == variable.Static { + for _, def := range pi.Definitions { + job := createTableAnalysisJob( + sctx, + db, + tblInfo, + partitionStats[def.ID], + autoAnalyzeRatio, + currentTs, + ) + pushJobFunc(job) + } + } else { + job := createTableAnalysisJobForPartitions( + sctx, + db, + tblInfo, + r.statsHandle.GetPartitionStats(tblInfo, tblInfo.ID), + partitionDefs, + partitionStats, + autoAnalyzeRatio, + currentTs, + ) + pushJobFunc(job) } } } @@ -196,6 +235,11 @@ func createTableAnalysisJob( lastAnalysisDuration := getTableLastAnalyzeDuration(tblStats, currentTs) indexes := checkIndexesNeedAnalyze(tblInfo, tblStats) + // No need to analyze. + if changePercentage == 0 && len(indexes) == 0 { + return nil + } + job := &priorityqueue.TableAnalysisJob{ TableID: tblInfo.ID, TableSchema: tableSchema, @@ -281,6 +325,136 @@ func checkIndexesNeedAnalyze( return indexes } +func createTableAnalysisJobForPartitions( + sctx sessionctx.Context, + tableSchema string, + tblInfo *model.TableInfo, + tblStats *statistics.Table, + defs []model.PartitionDefinition, + partitionStats map[int64]*statistics.Table, + autoAnalyzeRatio float64, + currentTs uint64, +) *priorityqueue.TableAnalysisJob { + // TODO: figure out how to check the table stats version correctly for partitioned tables. + tableStatsVer := sctx.GetSessionVars().AnalyzeVersion + statistics.CheckAnalyzeVerOnTable(tblStats, &tableStatsVer) + + averageChangePercentage, avgSize, minLastAnalyzeDuration, partitionNames := calculateIndicatorsForPartitions( + tblInfo, + partitionStats, + defs, + autoAnalyzeRatio, + currentTs, + ) + partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable( + tblInfo, + defs, + partitionStats, + ) + // No need to analyze. + if len(partitionNames) == 0 && len(partitionIndexes) == 0 { + return nil + } + + job := &priorityqueue.TableAnalysisJob{ + TableID: tblInfo.ID, + TableSchema: tableSchema, + TableName: tblInfo.Name.O, + TableStatsVer: tableStatsVer, + ChangePercentage: averageChangePercentage, + TableSize: avgSize, + LastAnalysisDuration: minLastAnalyzeDuration, + Partitions: partitionNames, + PartitionIndexes: partitionIndexes, + } + + return job +} + +// calculateIndicatorsForPartitions calculates the average change percentage, +// average size and average last analyze duration for the partitions that meet the threshold. +// Change percentage is the ratio of the number of modified rows to the total number of rows. +// Size is the product of the number of rows and the number of columns. +// Last analyze duration is the duration since the last analyze. +func calculateIndicatorsForPartitions( + tblInfo *model.TableInfo, + partitionStats map[int64]*statistics.Table, + defs []model.PartitionDefinition, + autoAnalyzeRatio float64, + currentTs uint64, +) ( + avgChange float64, + avgSize float64, + avgLastAnalyzeDuration time.Duration, + partitionNames []string, +) { + totalChangePercent := 0.0 + totalSize := 0.0 + count := 0.0 + partitionNames = make([]string, 0, len(defs)) + cols := float64(len(tblInfo.Columns)) + totalLastAnalyzeDuration := time.Duration(0) + + for _, def := range defs { + tblStats := partitionStats[def.ID] + changePercent := calculateChangePercentage(tblStats, autoAnalyzeRatio) + // No need to analyze the partition because it doesn't meet the threshold or stats are not loaded yet. + if changePercent == 0 { + continue + } + + totalChangePercent += changePercent + // size = count * cols + totalSize += float64(tblStats.RealtimeCount) * cols + lastAnalyzeDuration := getTableLastAnalyzeDuration(tblStats, currentTs) + totalLastAnalyzeDuration += lastAnalyzeDuration + partitionNames = append(partitionNames, def.Name.O) + count++ + } + if len(partitionNames) == 0 { + return 0, 0, 0, partitionNames + } + + avgChange = totalChangePercent / count + avgSize = totalSize / count + avgLastAnalyzeDuration = totalLastAnalyzeDuration / time.Duration(count) + + return avgChange, avgSize, avgLastAnalyzeDuration, partitionNames +} + +// checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable checks if the indexes of the partitioned table need to be analyzed. +// It returns a map from index name to the names of the partitions that need to be analyzed. +// NOTE: This is only for newly added indexes. +func checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable( + tblInfo *model.TableInfo, + defs []model.PartitionDefinition, + partitionStats map[int64]*statistics.Table, +) map[string][]string { + partitionIndexes := make(map[string][]string, len(tblInfo.Indices)) + + for _, idx := range tblInfo.Indices { + // No need to analyze the index if it's not public. + if idx.State != model.StatePublic { + continue + } + + // Find all the partitions that need to analyze this index. + names := make([]string, 0, len(defs)) + for _, def := range defs { + tblStats := partitionStats[def.ID] + if _, ok := tblStats.Indices[idx.ID]; !ok { + names = append(names, def.Name.O) + } + } + + if len(names) > 0 { + partitionIndexes[idx.Name.O] = names + } + } + + return partitionIndexes +} + func getStartTs(sctx sessionctx.Context) (uint64, error) { txn, err := sctx.Txn(true) if err != nil { @@ -288,3 +462,18 @@ func getStartTs(sctx sessionctx.Context) (uint64, error) { } return txn.StartTS(), nil } + +func getPartitionStats( + statsHandle statstypes.StatsHandle, + tblInfo *model.TableInfo, + defs []model.PartitionDefinition, +) map[int64]*statistics.Table { + partitionStats := make(map[int64]*statistics.Table, len(defs)) + + for _, def := range defs { + // TODO: use GetPartitionStatsForAutoAnalyze to save memory. + partitionStats[def.ID] = statsHandle.GetPartitionStats(tblInfo, def.ID) + } + + return partitionStats +} diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index ba3a9f11a7..cba54a744c 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -15,6 +15,7 @@ package refresher import ( + "sort" "testing" "time" @@ -200,15 +201,10 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) { r, err := NewRefresher(handle, sysProcTracker) require.NoError(t, err) - // Rebuild the job queue. + // Rebuild the job queue. No jobs are added. err = r.rebuildTableAnalysisJobQueue() require.NoError(t, err) - require.Equal(t, 1, r.jobs.Len()) - job1 := r.jobs.Pop() - require.Equal(t, float64(1), job1.Weight) - require.Equal(t, float64(0), job1.ChangePercentage) - require.Equal(t, float64(3*2), job1.TableSize) - require.GreaterOrEqual(t, job1.LastAnalysisDuration, time.Duration(0)) + require.Equal(t, 0, r.jobs.Len()) // Insert more data into t1. tk.MustExec("insert into t1 values (4, 4), (5, 5), (6, 6)") require.Nil(t, handle.DumpStatsDeltaToKV(true)) @@ -216,7 +212,7 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) { err = r.rebuildTableAnalysisJobQueue() require.NoError(t, err) require.Equal(t, 1, r.jobs.Len()) - job1 = r.jobs.Pop() + job1 := r.jobs.Pop() require.Equal(t, float64(1), job1.Weight) require.Equal(t, float64(1), job1.ChangePercentage) require.Equal(t, float64(6*2), job1.TableSize) @@ -376,3 +372,304 @@ func TestCheckIndexesNeedAnalyze(t *testing.T) { }) } } + +func TestCalculateIndicatorsForPartitions(t *testing.T) { + currentTs := oracle.ComposeTS((time.Hour.Nanoseconds()+time.Second.Nanoseconds())*1000, 0) + + tests := []struct { + name string + tblInfo *model.TableInfo + partitionStats map[int64]*statistics.Table + defs []model.PartitionDefinition + autoAnalyzeRatio float64 + currentTs uint64 + wantAvgChangePercentage float64 + wantAvgSize float64 + wantAvgLastAnalyzeDuration time.Duration + wantPartitions []string + }{ + { + name: "Test Table not analyzed", + tblInfo: &model.TableInfo{ + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("index1"), + State: model.StatePublic, + }, + }, + Columns: []*model.ColumnInfo{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + }, + partitionStats: map[int64]*statistics.Table{ + 1: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + }, + Version: currentTs, + }, + 2: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + }, + Version: currentTs, + }, + }, + defs: []model.PartitionDefinition{ + { + ID: 1, + Name: model.NewCIStr("p0"), + }, + { + ID: 2, + Name: model.NewCIStr("p1"), + }, + }, + autoAnalyzeRatio: 0.5, + currentTs: currentTs, + wantAvgChangePercentage: 1, + wantAvgSize: 2002, + wantAvgLastAnalyzeDuration: 0, + wantPartitions: []string{"p0", "p1"}, + }, + { + name: "Test Table analyzed and only one partition meets the threshold", + tblInfo: &model.TableInfo{ + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("index1"), + State: model.StatePublic, + }, + }, + Columns: []*model.ColumnInfo{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + }, + partitionStats: map[int64]*statistics.Table{ + 1: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: (exec.AutoAnalyzeMinCnt + 1) * 2, + Columns: map[int64]*statistics.Column{ + 1: { + StatsVer: 2, + }, + 2: { + StatsVer: 2, + }, + }, + }, + Version: currentTs, + }, + 2: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: 0, + Columns: map[int64]*statistics.Column{ + 1: { + StatsVer: 2, + }, + 2: { + StatsVer: 2, + }, + }, + }, + Version: currentTs, + }, + }, + defs: []model.PartitionDefinition{ + { + ID: 1, + Name: model.NewCIStr("p0"), + }, + { + ID: 2, + Name: model.NewCIStr("p1"), + }, + }, + autoAnalyzeRatio: 0.5, + currentTs: currentTs, + wantAvgChangePercentage: 2, + wantAvgSize: 2002, + wantAvgLastAnalyzeDuration: 0, + wantPartitions: []string{"p0"}, + }, + { + name: "No partition meets the threshold", + tblInfo: &model.TableInfo{ + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("index1"), + State: model.StatePublic, + }, + }, + Columns: []*model.ColumnInfo{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + }, + partitionStats: map[int64]*statistics.Table{ + 1: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: 0, + Columns: map[int64]*statistics.Column{ + 1: { + StatsVer: 2, + }, + 2: { + StatsVer: 2, + }, + }, + }, + Version: currentTs, + }, + 2: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: 0, + Columns: map[int64]*statistics.Column{ + 1: { + StatsVer: 2, + }, + 2: { + StatsVer: 2, + }, + }, + }, + Version: currentTs, + }, + }, + defs: []model.PartitionDefinition{ + { + ID: 1, + Name: model.NewCIStr("p0"), + }, + { + ID: 2, + Name: model.NewCIStr("p1"), + }, + }, + autoAnalyzeRatio: 0.5, + currentTs: currentTs, + wantAvgChangePercentage: 0, + wantAvgSize: 0, + wantAvgLastAnalyzeDuration: 0, + wantPartitions: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotAvgChangePercentage, + gotAvgSize, + gotAvgLastAnalyzeDuration, + gotPartitions := + calculateIndicatorsForPartitions( + tt.tblInfo, + tt.partitionStats, + tt.defs, + tt.autoAnalyzeRatio, + tt.currentTs, + ) + require.Equal(t, tt.wantAvgChangePercentage, gotAvgChangePercentage) + require.Equal(t, tt.wantAvgSize, gotAvgSize) + require.Equal(t, tt.wantAvgLastAnalyzeDuration, gotAvgLastAnalyzeDuration) + require.Equal(t, tt.wantPartitions, gotPartitions) + }) + } +} + +func TestCheckNewlyAddedIndexesNeedAnalyzeForPartitionedTable(t *testing.T) { + tblInfo := model.TableInfo{ + Indices: []*model.IndexInfo{ + { + ID: 1, + Name: model.NewCIStr("index1"), + State: model.StatePublic, + }, + { + ID: 2, + Name: model.NewCIStr("index2"), + State: model.StatePublic, + }, + }, + Columns: []*model.ColumnInfo{ + { + ID: 1, + }, + { + ID: 2, + }, + }, + } + partitionStats := map[int64]*statistics.Table{ + 1: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: 0, + Indices: map[int64]*statistics.Index{}, + }, + }, + 2: { + HistColl: statistics.HistColl{ + Pseudo: false, + RealtimeCount: exec.AutoAnalyzeMinCnt + 1, + ModifyCount: 0, + Indices: map[int64]*statistics.Index{ + 2: { + StatsVer: 2, + }, + }, + }, + }, + } + defs := []model.PartitionDefinition{ + { + ID: 1, + Name: model.NewCIStr("p0"), + }, + { + ID: 2, + Name: model.NewCIStr("p1"), + }, + } + + partitionIndexes := checkNewlyAddedIndexesNeedAnalyzeForPartitionedTable(&tblInfo, defs, partitionStats) + expected := map[string][]string{"index1": {"p0", "p1"}, "index2": {"p0"}} + require.Equal(t, len(expected), len(partitionIndexes)) + + for k, v := range expected { + sort.Strings(v) + if val, ok := partitionIndexes[k]; ok { + sort.Strings(val) + require.Equal(t, v, val) + } else { + require.Fail(t, "key not found in partitionIndexes: "+k) + } + } +}