statistics: add priority calculator (#51346)

ref pingcap/tidb#50132
This commit is contained in:
二手掉包工程师
2024-02-29 19:32:01 +08:00
committed by GitHub
parent 2558b0c307
commit 42c8d2d062
6 changed files with 209 additions and 17 deletions

View File

@ -25,6 +25,7 @@ go_test(
name = "priorityqueue_test",
timeout = "short",
srcs = [
"calculator_test.go",
"interval_test.go",
"job_test.go",
"main_test.go",
@ -32,7 +33,7 @@ go_test(
],
embed = [":priorityqueue"],
flaky = True,
shard_count = 15,
shard_count = 17,
deps = [
"//pkg/parser/model",
"//pkg/session",

View File

@ -14,23 +14,57 @@
package priorityqueue
// WeightCalculator is an interface for calculating weights of analysis jobs.
type WeightCalculator interface {
CalculateWeight(job *TableAnalysisJob) float64
}
import "math"
const (
// EventNone represents no special event.
eventNone = 0.0
// EventNewIndex represents a special event for newly added indexes.
eventNewIndex = 2.0
)
// TODO: make these configurable.
const (
changeRatioWeight = 0.6
sizeWeight = 0.1
analysisInterval = 0.3
)
// PriorityCalculator implements the WeightCalculator interface.
type PriorityCalculator struct {
threshold float64
}
type PriorityCalculator struct{}
// NewPriorityCalculator creates a new PriorityCalculator with the given threshold.
func NewPriorityCalculator(threshold float64) *PriorityCalculator {
return &PriorityCalculator{threshold: threshold}
// NewPriorityCalculator creates a new PriorityCalculator.
//
// For more information, please visit:
// https://github.com/pingcap/tidb/blob/master/docs/design/2023-11-29-priority-queue-for-auto-analyze.md
func NewPriorityCalculator() *PriorityCalculator {
return &PriorityCalculator{}
}
// CalculateWeight calculates the weight based on the given rules.
func (*PriorityCalculator) CalculateWeight(_ *TableAnalysisJob) float64 {
// TODO: implement the weight calculation
return 1
// - Table Change Ratio (Change Ratio): Accounts for 60%
// - Table Size (Size): Accounts for 10%
// - Analysis Interval (Analysis Interval): Accounts for 30%
// priority_score calculates the priority score based on the following formula:
//
// priority_score = (0.6 * math.Log10(1 + ChangeRatio) +
// 0.1 * (1 - math.Log10(1 + TableSize)) +
// 0.3 * math.Log10(1 + math.Sqrt(AnalysisInterval)) +
// special_event[event])
func (pc *PriorityCalculator) CalculateWeight(job *TableAnalysisJob) float64 {
// We multiply the priority_score by 100 to increase its magnitude. This ensures that
// when we apply the log10 function, the resulting value is more meaningful and reasonable.
changeRatio := 100 * job.ChangePercentage
return changeRatioWeight*math.Log10(1+changeRatio) +
sizeWeight*(1-math.Log10(1+job.TableSize)) +
analysisInterval*math.Log10(1+math.Sqrt(job.LastAnalysisDuration.Seconds())) +
pc.getSpecialEvent(job)
}
func (*PriorityCalculator) getSpecialEvent(job *TableAnalysisJob) float64 {
if job.HasNewlyAddedIndex() {
return eventNewIndex
}
return eventNone
}

View File

@ -0,0 +1,144 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package priorityqueue
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
type testData struct {
ID int
ChangePercentage float64
TableSize float64
LastAnalysisDuration time.Duration
}
func TestCalculateWeight(t *testing.T) {
// Note: all group are sorted by weight in ascending order.
pc := NewPriorityCalculator()
// Only focus on change percentage. Bigger change percentage, higher weight.
changePercentageGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 1,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 10,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
}
testWeightCalculation(t, pc, changePercentageGroup)
// Only focus on table size. Bigger table size, lower weight.
tableSizeGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 100000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 10000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
}
testWeightCalculation(t, pc, tableSizeGroup)
// Only focus on last analysis duration. Longer duration, higher weight.
lastAnalysisDurationGroup := []testData{
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour * 12,
},
{
ChangePercentage: 0.6,
TableSize: 1000,
LastAnalysisDuration: time.Hour * 24,
},
}
testWeightCalculation(t, pc, lastAnalysisDurationGroup)
// The system should not assign a higher weight to a recently analyzed table, even if it has undergone significant changes.
justBeingAnalyzedGroup := []testData{
{
ChangePercentage: 0.5,
TableSize: 1000,
LastAnalysisDuration: 2 * time.Hour,
},
{
ChangePercentage: 1,
TableSize: 1000,
LastAnalysisDuration: 10 * time.Minute,
},
}
testWeightCalculation(t, pc, justBeingAnalyzedGroup)
}
// testWeightCalculation is a helper function to test the weight calculation.
// It will check if the weight is increasing for each test data group.
func testWeightCalculation(t *testing.T, pc *PriorityCalculator, group []testData) {
prevWeight := -1.0
for _, tc := range group {
job := &TableAnalysisJob{
ChangePercentage: tc.ChangePercentage,
TableSize: tc.TableSize,
LastAnalysisDuration: tc.LastAnalysisDuration,
}
weight := pc.CalculateWeight(job)
require.Greater(t, weight, 0.0)
require.Greater(t, weight, prevWeight)
prevWeight = weight
}
}
func TestGetSpecialEvent(t *testing.T) {
pc := NewPriorityCalculator()
jobWithIndex := &TableAnalysisJob{
PartitionIndexes: map[string][]string{
"index1": {"p1", "p2"},
},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))
jobWithIndex = &TableAnalysisJob{
Indexes: []string{"index1"},
}
require.Equal(t, eventNewIndex, pc.getSpecialEvent(jobWithIndex))
jobWithoutIndex := &TableAnalysisJob{
PartitionIndexes: map[string][]string{},
Indexes: []string{},
}
require.Equal(t, eventNone, pc.getSpecialEvent(jobWithoutIndex))
}

View File

@ -66,6 +66,11 @@ type TableAnalysisJob struct {
Weight float64
}
// HasNewlyAddedIndex checks whether the table has newly added index.
func (j *TableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.PartitionIndexes) > 0 || len(j.Indexes) > 0
}
// IsValidToAnalyze checks whether the table is valid to analyze.
// It checks the last failed analysis duration and the average analysis duration.
// If the last failed analysis duration is less than 2 times the average analysis duration,

View File

@ -117,7 +117,7 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
calculator := priorityqueue.NewPriorityCalculator(autoAnalyzeRatio)
calculator := priorityqueue.NewPriorityCalculator()
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
// Query locked tables once to minimize overhead.
@ -158,7 +158,14 @@ func (r *Refresher) rebuildTableAnalysisJobQueue() error {
}
// Calculate the weight of the job.
job.Weight = calculator.CalculateWeight(job)
if job.Weight == 0 {
// We apply a penalty to larger tables, which can potentially result in a negative weight.
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
if job.Weight <= 0 {
statslogutil.StatsLogger().Info(
"Table is not ready to analyze",
zap.String("reason", "weight is not positive"),
zap.Stringer("job", job),
)
return
}
// Push the job onto the queue.

View File

@ -15,6 +15,7 @@
package refresher
import (
"math"
"sort"
"testing"
"time"
@ -213,7 +214,7 @@ func TestRebuildTableAnalysisJobQueue(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, r.jobs.Len())
job1 := r.jobs.Pop()
require.Equal(t, float64(1), job1.Weight)
require.Equal(t, 1.2, math.Round(job1.Weight*10)/10)
require.Equal(t, float64(1), job1.ChangePercentage)
require.Equal(t, float64(6*2), job1.TableSize)
require.GreaterOrEqual(t, job1.LastAnalysisDuration, time.Duration(0))