233 lines
6.8 KiB
Go
233 lines
6.8 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package priorityqueue
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
|
|
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
|
|
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// PushJobFunc is a function that pushes an AnalysisJob to a queue.
|
|
type PushJobFunc func(job AnalysisJob) error
|
|
|
|
// FetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions.
|
|
func FetchAllTablesAndBuildAnalysisJobs(
|
|
sctx sessionctx.Context,
|
|
parameters map[string]string,
|
|
autoAnalysisTimeWindow AutoAnalysisTimeWindow,
|
|
statsHandle statstypes.StatsHandle,
|
|
jobFunc PushJobFunc,
|
|
) error {
|
|
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
|
|
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
|
|
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
|
|
// Query locked tables once to minimize overhead.
|
|
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
|
|
lockedTables, err := lockstats.QueryLockedTables(sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Get current timestamp from the session context.
|
|
currentTs, err := getStartTs(sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
|
|
calculator := NewPriorityCalculator()
|
|
|
|
dbs := is.AllSchemaNames()
|
|
for _, db := range dbs {
|
|
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
|
|
// so we need to check the available time.
|
|
if !autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
|
|
return nil
|
|
}
|
|
|
|
// Ignore the memory and system database.
|
|
if util.IsMemOrSysDB(db.L) {
|
|
continue
|
|
}
|
|
|
|
tbls, err := is.SchemaTableInfos(context.Background(), db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We need to check every partition of every table to see if it needs to be analyzed.
|
|
for _, tblInfo := range tbls {
|
|
// If table locked, skip analyze all partitions of the table.
|
|
if _, ok := lockedTables[tblInfo.ID]; ok {
|
|
continue
|
|
}
|
|
|
|
if tblInfo.IsView() {
|
|
continue
|
|
}
|
|
|
|
pi := tblInfo.GetPartitionInfo()
|
|
if pi == nil {
|
|
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
|
|
db.O,
|
|
tblInfo,
|
|
statsHandle.GetTableStatsForAutoAnalyze(tblInfo),
|
|
)
|
|
err := setWeightAndPushJob(jobFunc, job, calculator)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Only analyze the partition that has not been locked.
|
|
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
|
|
for _, def := range pi.Definitions {
|
|
if _, ok := lockedTables[def.ID]; !ok {
|
|
partitionDefs = append(partitionDefs, def)
|
|
}
|
|
}
|
|
partitionStats := GetPartitionStats(statsHandle, tblInfo, partitionDefs)
|
|
// If the prune mode is static, we need to analyze every partition as a separate table.
|
|
if pruneMode == variable.Static {
|
|
for pIDAndName, stats := range partitionStats {
|
|
job := jobFactory.CreateStaticPartitionAnalysisJob(
|
|
db.O,
|
|
tblInfo,
|
|
pIDAndName.ID,
|
|
pIDAndName.Name,
|
|
stats,
|
|
)
|
|
err := setWeightAndPushJob(jobFunc, job, calculator)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
|
|
db.O,
|
|
tblInfo,
|
|
statsHandle.GetPartitionStatsForAutoAnalyze(tblInfo, tblInfo.ID),
|
|
partitionStats,
|
|
)
|
|
err := setWeightAndPushJob(jobFunc, job, calculator)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setWeightAndPushJob(pushFunc PushJobFunc, job AnalysisJob, calculator *PriorityCalculator) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
// We apply a penalty to larger tables, which can potentially result in a negative weight.
|
|
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
|
|
weight := calculator.CalculateWeight(job)
|
|
if weight <= 0 {
|
|
statslogutil.SingletonStatsSamplerLogger().Warn(
|
|
"Table gets a negative weight",
|
|
zap.Float64("weight", weight),
|
|
zap.Stringer("job", job),
|
|
)
|
|
}
|
|
job.SetWeight(weight)
|
|
// Push the job onto the queue.
|
|
return pushFunc(job)
|
|
}
|
|
|
|
func getStartTs(sctx sessionctx.Context) (uint64, error) {
|
|
txn, err := sctx.Txn(true)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return txn.StartTS(), nil
|
|
}
|
|
|
|
// AnalysisPriorityQueue is a priority queue for TableAnalysisJobs.
|
|
type AnalysisPriorityQueue struct {
|
|
inner *AnalysisInnerQueue
|
|
}
|
|
|
|
// NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue.
|
|
func NewAnalysisPriorityQueue() *AnalysisPriorityQueue {
|
|
q := &AnalysisPriorityQueue{
|
|
inner: &AnalysisInnerQueue{},
|
|
}
|
|
heap.Init(q.inner)
|
|
return q
|
|
}
|
|
|
|
// Push adds a job to the priority queue with the given weight.
|
|
func (apq *AnalysisPriorityQueue) Push(job AnalysisJob) error {
|
|
heap.Push(apq.inner, job)
|
|
return nil
|
|
}
|
|
|
|
// Pop removes the highest priority job from the queue.
|
|
func (apq *AnalysisPriorityQueue) Pop() AnalysisJob {
|
|
return heap.Pop(apq.inner).(AnalysisJob)
|
|
}
|
|
|
|
// Len returns the number of jobs in the queue.
|
|
func (apq *AnalysisPriorityQueue) Len() int {
|
|
return apq.inner.Len()
|
|
}
|
|
|
|
// An AnalysisInnerQueue implements heap.Interface and holds TableAnalysisJobs.
|
|
// Exported for testing purposes. You should not use this directly.
|
|
type AnalysisInnerQueue []AnalysisJob
|
|
|
|
// Implement the sort.Interface methods for the priority queue.
|
|
|
|
func (aq AnalysisInnerQueue) Len() int { return len(aq) }
|
|
func (aq AnalysisInnerQueue) Less(i, j int) bool {
|
|
// We want Pop to give us the highest, not lowest, priority, so we use greater than here.
|
|
return aq[i].GetWeight() > aq[j].GetWeight()
|
|
}
|
|
func (aq AnalysisInnerQueue) Swap(i, j int) {
|
|
aq[i], aq[j] = aq[j], aq[i]
|
|
}
|
|
|
|
// Push adds an item to the priority queue.
|
|
func (aq *AnalysisInnerQueue) Push(x any) {
|
|
item := x.(AnalysisJob)
|
|
*aq = append(*aq, item)
|
|
}
|
|
|
|
// Pop removes the highest priority item from the queue.
|
|
func (aq *AnalysisInnerQueue) Pop() any {
|
|
old := *aq
|
|
n := len(old)
|
|
item := old[n-1]
|
|
*aq = old[0 : n-1]
|
|
return item
|
|
}
|