1275 lines
56 KiB
Go
1275 lines
56 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package priorityqueue
|
|
|
|
import (
|
|
"context"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/metadef"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/statistics"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
|
|
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
|
|
statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types"
|
|
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const notInitializedErrMsg = "priority queue not initialized"
|
|
|
|
const (
|
|
lastAnalysisDurationRefreshInterval = time.Minute * 10
|
|
dmlChangesFetchInterval = time.Minute * 2
|
|
mustRetryJobRequeueInterval = time.Minute * 5
|
|
)
|
|
|
|
// If the process takes longer than this threshold, we will log it as a slow log.
|
|
const slowLogThreshold = 150 * time.Millisecond
|
|
|
|
// Every 15 minutes, at most 1 log will be output.
|
|
var queueSamplerLogger = logutil.SampleLoggerFactory(15*time.Minute, 1, zap.String(logutil.LogFieldCategory, "stats"))
|
|
|
|
// pqHeap is an interface that wraps the methods of a priority queue heap.
|
|
type pqHeap interface {
|
|
// getByKey returns the job by the given table ID.
|
|
getByKey(tableID int64) (AnalysisJob, bool, error)
|
|
// addOrUpdate adds a job to the heap or updates the job if it already exists.
|
|
addOrUpdate(job AnalysisJob) error
|
|
// update updates a job in the heap.
|
|
update(job AnalysisJob) error
|
|
// delete deletes a job from the heap.
|
|
delete(job AnalysisJob) error
|
|
// list returns all jobs in the heap.
|
|
list() []AnalysisJob
|
|
// pop pops the job with the highest priority from the heap.
|
|
pop() (AnalysisJob, error)
|
|
// peek peeks the job with the highest priority from the heap without removing it.
|
|
peek() (AnalysisJob, error)
|
|
// isEmpty returns true if the heap is empty.
|
|
isEmpty() bool
|
|
// len returns the number of jobs in the heap.
|
|
len() int
|
|
}
|
|
|
|
// AnalysisPriorityQueue is a priority queue for TableAnalysisJobs.
|
|
// Testing shows that keeping all jobs in memory is feasible.
|
|
// Memory usage for one million tables is approximately 300 to 500 MiB, which is acceptable.
|
|
// Typically, not many tables need to be analyzed simultaneously.
|
|
//
|
|
// ┌─────────────────────────────────────────────────────────────────────────────────────┐
|
|
// │ LIFECYCLE & THREAD SAFETY GUIDE │
|
|
// └─────────────────────────────────────────────────────────────────────────────────────┘
|
|
//
|
|
// GOROUTINE ARCHITECTURE:
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// This priority queue system involves the following goroutines:
|
|
//
|
|
// 1. [Auto-Analyze Worker] - The main ticker loop in domain.autoAnalyzeWorker()
|
|
// - Runs every stats lease interval (~3 seconds)
|
|
// - Calls Initialize() or Close() based on ownership and configuration
|
|
// - Calls Pop() to retrieve analysis jobs and submits them for execution
|
|
//
|
|
// 2. [Queue Worker] - The background goroutine started by Initialize() (run() function)
|
|
// - Periodically processes DML changes (every 2 minutes)
|
|
// - Refreshes last analysis duration (every 10 minutes)
|
|
// - Requeues must-retry jobs (every 5 minutes)
|
|
// - Exits when context is canceled by Close()
|
|
//
|
|
// 3. [Job Executor] - The goroutine(s) that execute actual ANALYZE jobs
|
|
// - Started when a job is submitted for execution (after Pop())
|
|
// - Calls job hooks (success/failure) when analysis completes
|
|
// - May run concurrently with queue operations
|
|
//
|
|
// 4. [DDL Handler] - Goroutine(s) that handle DDL events
|
|
// - Calls HandleDDLEvent() when schema changes occur
|
|
// - May run concurrently with other goroutines
|
|
//
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
// SCENARIO 1: Normal Lifecycle (Ticker-Based Ownership Management)
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// The priority queue is managed by a ticker-based loop in domain.autoAnalyzeWorker().
|
|
// Every stats lease interval (default: 3 seconds), the loop checks:
|
|
//
|
|
// - If auto-analyze is enabled AND instance is owner → call HandleAutoAnalyze()
|
|
//
|
|
// - If auto-analyze is disabled OR instance is not owner → call ClosePriorityQueue()
|
|
//
|
|
// Auto-Analyze Worker Priority Queue Queue Worker
|
|
// ─────────────────── ────────────── ────────────
|
|
// <ticker fires>
|
|
// │
|
|
// ├──check: RunAutoAnalyze.Load() && IsOwner()?
|
|
// │ │
|
|
// │ └──> YES
|
|
// │
|
|
// AnalyzeHighestPriorityTables()
|
|
// │
|
|
// ├──IsInitialized()?
|
|
// │ │
|
|
// │ └──> false
|
|
// │
|
|
// ├──Initialize(ctx) ──────────────────────────────────────> run() starts
|
|
// │ │ │
|
|
// │ ├──fetchAllTablesAndBuildAnalysisJobs() │
|
|
// │ ├──set initialized=true │
|
|
// │ └──spawn run() goroutine │
|
|
// │ │
|
|
// ├──Pop() ──────────────────┐ ├──ProcessDMLChanges()
|
|
// │ │ │ │ (every 2 min)
|
|
// │ ├──get job │ │
|
|
// │ ├──mark as running│ ├──RefreshLastAnalysisDuration()
|
|
// │ └──register hooks │ │ (every 10 min)
|
|
// │ │ │
|
|
// ├──SubmitJob(job) ─────────┘ ├──RequeueMustRetryJobs()
|
|
// │ │ (every 5 min)
|
|
// │ │
|
|
// ... ...
|
|
// │ │
|
|
// <ticker fires> │
|
|
// │ │
|
|
// ├──check: !RunAutoAnalyze.Load() || !IsOwner()? │
|
|
// │ │ │
|
|
// │ └──> YES (ownership lost or auto-analyze disabled) │
|
|
// │ │
|
|
// ├──ClosePriorityQueue() ──────────────────────────────────────> ctx.Done()
|
|
// │ │ │
|
|
// │ ├──cancel context ├──exit loop
|
|
// │ ├──unlock │
|
|
// │ ├──Wait() ◄─────────────────────────────────────────────┤
|
|
// │ │ (waits for Queue Worker to exit) │
|
|
// │ │ resetSyncFields()
|
|
// │ │ │
|
|
// │ │ ├──set initialized=false
|
|
// │ │ ├──nil out maps
|
|
// │ │ ◄─────────────────────────────────────────────────────┘
|
|
// │ │ (Queue Worker exits)
|
|
// │ └──Done
|
|
// │
|
|
//
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
// SCENARIO 2: Concurrent Close During Active Processing (Deadlock Prevention)
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// The Close() function is specifically designed to avoid deadlock. Here's why:
|
|
//
|
|
// Auto-Analyze Worker (Close) syncFields.mu Queue Worker
|
|
// ─────────────────────────── ───────────── ────────────
|
|
// Close()
|
|
// │
|
|
// ├──Lock() ──────────────────> [LOCKED] ◄───────────────── ProcessDMLChanges()
|
|
// │ │ │
|
|
// ├──check initialized │ │(waiting for lock)
|
|
// ├──cancel context ─────────────────┼──────────────────────> (will see ctx.Done())
|
|
// │ │ │
|
|
// ├──Unlock() ────────────────> [UNLOCKED] │
|
|
// │ │ │
|
|
// │ │ ◄────────────────────────────┤
|
|
// │ │ (acquires lock)
|
|
// │ │ │
|
|
// │ │ (finishes DML processing)
|
|
// │ │ │
|
|
// │ │ (checks ctx.Done())
|
|
// │ │ │
|
|
// │ │ (exits loop)
|
|
// │ │ │
|
|
// ├──Wait() ◄───────────────────────────────────────────────────────┤
|
|
// │ (waits OUTSIDE lock) (Queue Worker exits)
|
|
// │ │
|
|
// └──Done │
|
|
//
|
|
// CRITICAL: If Close() waited while holding the lock, it would deadlock:
|
|
//
|
|
// Auto-Analyze Worker: holds lock → waits for Queue Worker to exit
|
|
// Queue Worker: tries to acquire lock → blocked forever
|
|
// Result: DEADLOCK ❌
|
|
//
|
|
// Current design (unlock before wait):
|
|
//
|
|
// Auto-Analyze Worker: releases lock → waits for Queue Worker
|
|
// Queue Worker: acquires lock → processes → checks context → exits
|
|
// Result: Clean shutdown
|
|
//
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
// SCENARIO 3: DDL Events During Running Jobs (Must Retry Pattern)
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// Auto-Analyze Worker Priority Queue DDL Handler Job Executor
|
|
// ─────────────────── ────────────── ─────────── ────────────
|
|
// Pop() for table T1
|
|
// │
|
|
// ├──get job
|
|
// ├──add T1 to runningJobs
|
|
// └──register hooks
|
|
// │
|
|
// SubmitJob(T1) ──────────────────────────────────────────────────────────────> [Start analyzing T1]
|
|
// │ │
|
|
// │ ALTER TABLE T1 ADD INDEX │
|
|
// │ │ │
|
|
// │ HandleDDLEvent(T1) │
|
|
// │ │ │
|
|
// │ ├──check if T1 in runningJobs
|
|
// │ │ │ │
|
|
// │ │ └──> YES (still running)
|
|
// │ │ │
|
|
// │ ├──add T1 to mustRetryJobs
|
|
// │ │ (don't queue now) │
|
|
// │ │ │
|
|
// │ └──return │
|
|
// │ │
|
|
// │ [Analysis completes]
|
|
// │ │
|
|
// │ └──Done
|
|
//
|
|
// ...time passes (5 minutes)...
|
|
//
|
|
// RequeueMustRetryJobs()
|
|
// │
|
|
// ├──get T1 from mustRetryJobs
|
|
// ├──delete T1 from mustRetryJobs
|
|
// ├──recreateAndPushJobForTable(T1)
|
|
// │ │
|
|
// │ ├──fetch new table info (with new index)
|
|
// │ ├──create new job
|
|
// │ └──push to queue
|
|
// │
|
|
// └──Done
|
|
//
|
|
// Pop() for table T1 (2nd time)
|
|
// │
|
|
// └──[Analyze T1 again with new index]
|
|
//
|
|
// WHY THIS PATTERN?
|
|
// - If we queued T1 immediately while it's running, we'd analyze the same table twice concurrently
|
|
// - By marking as mustRetry, we defer the re-queue until the current analysis finishes
|
|
// - This ensures no DML changes are missed while avoiding duplicate work
|
|
//
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
// SCENARIO 4: Job Hooks After Queue Closed (Graceful Shutdown)
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// Auto-Analyze Worker Priority Queue Job Executor
|
|
// ─────────────────── ────────────── ────────────
|
|
// Pop() for table T1
|
|
// │
|
|
// ├──mark T1 as running
|
|
// └──return job ──────────────────────────────────────> [Start analyzing T1]
|
|
// │
|
|
// <ticker fires> │
|
|
// │ │
|
|
// ├──check: !RunAutoAnalyze || !IsOwner │
|
|
// │ │ │
|
|
// │ └──> YES (ownership lost) │
|
|
// │ │
|
|
// ├──ClosePriorityQueue() │
|
|
// │ │ │
|
|
// │ ├──cancel context │
|
|
// │ ├──unlock │
|
|
// │ ├──Wait() (waits for Queue Worker to exit) │
|
|
// │ │ │
|
|
// │ │ resetSyncFields() │
|
|
// │ │ │ │
|
|
// │ │ ├──set initialized=false │
|
|
// │ │ ├──runningJobs = nil ◄──────────────────────┼──────┐
|
|
// │ │ ├──mustRetryJobs = nil │ │
|
|
// │ │ └──Done │ │
|
|
// │ │ │ │
|
|
// │ └──Done │ │
|
|
// │ │ │
|
|
// [New owner takes over] │ │
|
|
// │ │
|
|
// [Analysis completes]
|
|
// │ │
|
|
// SuccessHook() │
|
|
// │ │
|
|
// ├──Lock()
|
|
// ├──check if runningJobs == nil
|
|
// │ │
|
|
// │ └──> YES (queue closed)
|
|
// ├──return (no-op)
|
|
// └──Unlock()
|
|
//
|
|
// SAFETY GUARANTEES:
|
|
// - Running jobs are allowed to complete even after ClosePriorityQueue()
|
|
// - Hooks check for nil maps before accessing them
|
|
// - No crashes, no panics, graceful degradation
|
|
//
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
// SCENARIO 5: Ownership Lost During Initialization (Delayed Cleanup)
|
|
// ═══════════════════════════════════════════════════════════════════════════════════════
|
|
//
|
|
// The Auto-Analyze Worker is single-threaded, so ClosePriorityQueue() CANNOT be called
|
|
// while Initialize() is running. However, ownership can change DURING initialization,
|
|
// leading to a queue that's fully initialized even though we're no longer the owner.
|
|
//
|
|
// IMPORTANT: This is a known race condition but is acceptable because:
|
|
// 1. The queue will be closed on the NEXT ticker (within ~3 seconds)
|
|
// 2. The new owner will have its own queue
|
|
// 3. At worst, we waste resources for a few seconds
|
|
//
|
|
// Auto-Analyze Worker Ownership Changes Priority Queue State
|
|
// ─────────────────── ───────────────── ────────────────────
|
|
// <ticker fires at T=0s>
|
|
// │
|
|
// ├──check: IsOwner() → YES
|
|
// │
|
|
// AnalyzeHighestPriorityTables()
|
|
// │
|
|
// ├──Initialize(ctx)
|
|
// │ │
|
|
// │ ├──rebuildWithoutLock()
|
|
// │ │ ┌──────────────┐
|
|
// │ │ │ ~1 min for │ [T=30s: Ownership transferred
|
|
// │ │ │ 1M tables │ to another instance]
|
|
// │ │ │ │ │
|
|
// │ │ │ (still │ ├──Other instance
|
|
// │ │ │ building... │ │ becomes owner
|
|
// │ │ │ unaware of │ │
|
|
// │ │ │ ownership │ └──This instance is
|
|
// │ │ │ loss) │ no longer owner!
|
|
// │ │ └──────────────┘
|
|
// │ │
|
|
// │ ├──set initialized=true
|
|
// │ ├──spawn run() ──────────────────────────────> Queue Worker running
|
|
// │ │ (but shouldn't be!)
|
|
// │ └──return
|
|
// │
|
|
// └──return from HandleAutoAnalyze()
|
|
// (Auto-Analyze Worker now unblocked)
|
|
//
|
|
// <ticker fires at T=63s>
|
|
// │
|
|
// ├──check: IsOwner() → NO
|
|
// │
|
|
// ├──ClosePriorityQueue() ──────────────────────────> Queue closed
|
|
// │ (cleans up state)
|
|
// └──Done
|
|
//
|
|
// WHY THIS IS ACCEPTABLE:
|
|
// - The queue runs for at most one ticker interval (~3 seconds) after ownership loss
|
|
// - No correctness issues: the old owner's queue and new owner's queue are independent
|
|
// - Resource waste is minimal and temporary
|
|
// - Alternative (checking ownership during Initialize and Close) would add complexity for little gain
|
|
// - It is possible to analyze the same table twice in this short window (Close does not wait for running jobs to finish), but this is acceptable
|
|
//
|
|
// SAFETY GUARANTEES:
|
|
// - No data races: mutex protects all state transitions
|
|
// - No concurrent Initialize/Close: Auto-Analyze Worker is single-threaded
|
|
// - Graceful degradation: Queue can be re-initialized after Close()
|
|
//
|
|
//nolint:fieldalignment
|
|
type AnalysisPriorityQueue struct {
|
|
ctx context.Context
|
|
statsHandle statstypes.StatsHandle
|
|
calculator *PriorityCalculator
|
|
|
|
wg util.WaitGroupWrapper
|
|
|
|
// syncFields is a substructure to hold fields protected by mu.
|
|
syncFields struct {
|
|
// mu is used to protect the following fields.
|
|
mu sync.RWMutex
|
|
// Because the Initialize and Close functions can be called concurrently,
|
|
// so we need to protect the cancel function to avoid data race.
|
|
cancel context.CancelFunc
|
|
inner pqHeap
|
|
// runningJobs is a map to store the running jobs. Used to avoid duplicate jobs.
|
|
runningJobs map[int64]struct{}
|
|
// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
|
|
lastDMLUpdateFetchTimestamp uint64
|
|
// mustRetryJobs is a slice to store the must retry jobs.
|
|
// For now, we have two types of jobs:
|
|
// 1. The jobs that failed to be executed. We have to try it later.
|
|
// 2. The jobs failed to enqueue due to the ongoing analysis,
|
|
// particularly for tables with new indexes created during this process.
|
|
// We will requeue the must retry jobs periodically.
|
|
mustRetryJobs map[int64]struct{}
|
|
// initialized is a flag to check if the queue is initialized.
|
|
initialized bool
|
|
}
|
|
}
|
|
|
|
// NewAnalysisPriorityQueue creates a new AnalysisPriorityQueue2.
|
|
func NewAnalysisPriorityQueue(handle statstypes.StatsHandle) *AnalysisPriorityQueue {
|
|
queue := &AnalysisPriorityQueue{
|
|
statsHandle: handle,
|
|
calculator: NewPriorityCalculator(),
|
|
}
|
|
|
|
return queue
|
|
}
|
|
|
|
// IsInitialized checks if the priority queue is initialized.
|
|
func (pq *AnalysisPriorityQueue) IsInitialized() bool {
|
|
pq.syncFields.mu.RLock()
|
|
defer pq.syncFields.mu.RUnlock()
|
|
|
|
return pq.syncFields.initialized
|
|
}
|
|
|
|
// Initialize initializes the priority queue.
|
|
// NOTE: This function is thread-safe.
|
|
func (pq *AnalysisPriorityQueue) Initialize(ctx context.Context) (err error) {
|
|
pq.syncFields.mu.Lock()
|
|
if pq.syncFields.initialized {
|
|
statslogutil.StatsLogger().Warn("Priority queue already initialized")
|
|
pq.syncFields.mu.Unlock()
|
|
return nil
|
|
}
|
|
|
|
start := time.Now()
|
|
defer func() {
|
|
if err != nil {
|
|
statslogutil.StatsLogger().Error("Failed to initialize priority queue", zap.Error(err), zap.Duration("duration", time.Since(start)))
|
|
return
|
|
}
|
|
statslogutil.StatsLogger().Info("Priority queue initialized", zap.Duration("duration", time.Since(start)))
|
|
}()
|
|
|
|
// Before doing any heavy work, check if the context is already canceled.
|
|
// NOTE: This can happen if the instance starts exiting.
|
|
// This helps us avoid initializing the queue during shutdown.
|
|
// For example, if the auto-analyze ticker fires just before shutdown
|
|
// and the thread is delayed before it calls Initialize,
|
|
// Close may be called before initialization really starts.
|
|
// In this case, we should not proceed with initialization. Technically,
|
|
// rebuildWithoutLock will handle this since it also checks the context.
|
|
// However, it is better to check here to make it more explicit and avoid unnecessary work.
|
|
if ctx.Err() != nil {
|
|
pq.syncFields.mu.Unlock()
|
|
pq.Close()
|
|
return errors.Trace(ctx.Err())
|
|
}
|
|
|
|
if err := pq.rebuildWithoutLock(ctx); err != nil {
|
|
pq.syncFields.mu.Unlock()
|
|
pq.Close()
|
|
return errors.Trace(err)
|
|
}
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
pq.ctx = ctx
|
|
pq.syncFields.cancel = cancel
|
|
pq.syncFields.runningJobs = make(map[int64]struct{})
|
|
pq.syncFields.mustRetryJobs = make(map[int64]struct{})
|
|
pq.syncFields.initialized = true
|
|
// Start a goroutine to maintain the priority queue.
|
|
// Put it here to avoid data race when calling Initialize and Close concurrently.
|
|
// Otherwise, it may cause a data race issue.
|
|
pq.wg.RunWithRecover(pq.run, nil)
|
|
pq.syncFields.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Rebuild rebuilds the priority queue.
|
|
// NOTE: This function is thread-safe.
|
|
func (pq *AnalysisPriorityQueue) Rebuild() error {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
|
|
if !pq.syncFields.initialized {
|
|
return errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
return pq.rebuildWithoutLock(pq.ctx)
|
|
}
|
|
|
|
// rebuildWithoutLock rebuilds the priority queue without holding the lock.
|
|
// NOTE: Please hold the lock before calling this function.
|
|
func (pq *AnalysisPriorityQueue) rebuildWithoutLock(ctx context.Context) error {
|
|
pq.syncFields.inner = newHeap()
|
|
|
|
// We need to fetch the next check version with offset before fetching all tables and building analysis jobs.
|
|
// Otherwise, we may miss some DML changes happened during the process because this operation takes time.
|
|
// For example, 1m tables will take about 1min to fetch all tables and build analysis jobs.
|
|
// This will guarantee that we will not miss any DML changes. But it may cause some DML changes to be processed twice.
|
|
// It is acceptable since the DML changes operation is idempotent.
|
|
nextCheckVersionWithOffset := pq.statsHandle.GetNextCheckVersionWithOffset()
|
|
err := pq.fetchAllTablesAndBuildAnalysisJobs(ctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// Update the last fetch timestamp of DML updates.
|
|
pq.syncFields.lastDMLUpdateFetchTimestamp = nextCheckVersionWithOffset
|
|
|
|
return nil
|
|
}
|
|
|
|
// fetchAllTablesAndBuildAnalysisJobs builds analysis jobs for all eligible tables and partitions.
|
|
// NOTE: Please hold the lock before calling this function.
|
|
func (pq *AnalysisPriorityQueue) fetchAllTablesAndBuildAnalysisJobs(ctx context.Context) error {
|
|
return statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
|
parameters := exec.GetAutoAnalyzeParameters(sctx)
|
|
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[vardef.TiDBAutoAnalyzeRatio])
|
|
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
|
|
// Query locked tables once to minimize overhead.
|
|
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
|
|
lockedTables, err := lockstats.QueryLockedTables(statsutil.StatsCtx, sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
is := sctx.GetLatestInfoSchema().(infoschema.InfoSchema)
|
|
// Get current timestamp from the session context.
|
|
currentTs, err := statsutil.GetStartTS(sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
|
|
|
|
// Get all schemas except the memory and system database.
|
|
tbls := make([]*model.TableInfo, 0, 512)
|
|
// This only occurs during priority queue initialization which is infrequent.
|
|
halfCPUNum := runtime.NumCPU() / 2
|
|
start := time.Now()
|
|
if err := meta.IterAllTables(
|
|
ctx,
|
|
sctx.GetStore(),
|
|
currentTs,
|
|
halfCPUNum,
|
|
// Make sure this function is thread-safe.
|
|
func(info *model.TableInfo) error {
|
|
// Ignore the memory and system database.
|
|
db, ok := is.SchemaByID(info.DBID)
|
|
if !ok || metadef.IsMemOrSysDB(db.Name.L) {
|
|
return nil
|
|
}
|
|
tbls = append(tbls, info)
|
|
return nil
|
|
}); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
statslogutil.StatsLogger().Info("Fetched all tables", zap.Int("tableCount", len(tbls)), zap.Duration("duration", time.Since(start)))
|
|
// Add assertion to verify we've collected all tables by comparing with two different methods.
|
|
// The below one is way slower than the above one, so we only use it for verification.
|
|
intest.AssertFunc(func() bool {
|
|
dbs := is.AllSchemaNames()
|
|
verifyTbls := make([]*model.TableInfo, 0, 512)
|
|
for _, db := range dbs {
|
|
// Ignore the memory and system database.
|
|
if metadef.IsMemOrSysDB(db.L) {
|
|
continue
|
|
}
|
|
|
|
tbls, err := is.SchemaTableInfos(context.Background(), db)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
verifyTbls = append(verifyTbls, tbls...)
|
|
}
|
|
return len(verifyTbls) == len(tbls)
|
|
})
|
|
|
|
// We need to check every partition of every table to see if it needs to be analyzed.
|
|
for _, tblInfo := range tbls {
|
|
// If table locked, skip analyze all partitions of the table.
|
|
if _, ok := lockedTables[tblInfo.ID]; ok {
|
|
continue
|
|
}
|
|
|
|
if tblInfo.IsView() {
|
|
continue
|
|
}
|
|
|
|
pi := tblInfo.GetPartitionInfo()
|
|
if pi == nil {
|
|
stats, found := pq.statsHandle.GetNonPseudoPhysicalTableStats(tblInfo.ID)
|
|
if !found {
|
|
continue
|
|
}
|
|
|
|
job := jobFactory.CreateNonPartitionedTableAnalysisJob(
|
|
tblInfo,
|
|
stats,
|
|
)
|
|
err := pq.pushWithoutLock(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Only analyze the partition that has not been locked.
|
|
partitionDefs := make([]model.PartitionDefinition, 0, len(pi.Definitions))
|
|
for _, def := range pi.Definitions {
|
|
if _, ok := lockedTables[def.ID]; !ok {
|
|
partitionDefs = append(partitionDefs, def)
|
|
}
|
|
}
|
|
partitionStats := GetPartitionStats(pq.statsHandle, partitionDefs)
|
|
// If the prune mode is static, we need to analyze every partition as a separate table.
|
|
if pruneMode == variable.Static {
|
|
for pIDAndName, stats := range partitionStats {
|
|
job := jobFactory.CreateStaticPartitionAnalysisJob(
|
|
tblInfo,
|
|
pIDAndName.ID,
|
|
stats,
|
|
)
|
|
err := pq.pushWithoutLock(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
globalStats, found := pq.statsHandle.GetNonPseudoPhysicalTableStats(tblInfo.ID)
|
|
if !found {
|
|
continue
|
|
}
|
|
job := jobFactory.CreateDynamicPartitionedTableAnalysisJob(
|
|
tblInfo,
|
|
globalStats,
|
|
partitionStats,
|
|
)
|
|
err := pq.pushWithoutLock(job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}, statsutil.FlagWrapTxn)
|
|
}
|
|
|
|
// run maintains the priority queue.
|
|
func (pq *AnalysisPriorityQueue) run() {
|
|
// Make sure to reset the fields when exiting the goroutine.
|
|
defer pq.resetSyncFields()
|
|
|
|
dmlChangesFetchInterval := time.NewTicker(dmlChangesFetchInterval)
|
|
defer dmlChangesFetchInterval.Stop()
|
|
timeRefreshInterval := time.NewTicker(lastAnalysisDurationRefreshInterval)
|
|
defer timeRefreshInterval.Stop()
|
|
mustRetryJobRequeueInterval := time.NewTicker(mustRetryJobRequeueInterval)
|
|
defer mustRetryJobRequeueInterval.Stop()
|
|
|
|
// HACK: Inject a failpoint to speed up DML changes processing for testing.
|
|
// This simulates a scenario where DML changes are processed very frequently to verify
|
|
// that the priority queue can be closed gracefully without deadlock.
|
|
// The known deadlock occurs in the following scenario:
|
|
// 1. The priority queue is closing: it holds the lock and waits for the `run` goroutine to exit.
|
|
// 2. The `run` goroutine tries to acquire the lock to process DML changes.
|
|
// 3. The lock is unavailable, so the `run` goroutine blocks.
|
|
// 4. The Close() function waits for the `run` goroutine to exit, but the `run` goroutine
|
|
// is waiting for the lock held by Close(). This causes a deadlock.
|
|
// So in this failpoint, we use a separate ticker to ensure that DML changes are processed frequently.
|
|
// And it does not check for context cancellation in every iteration to maximize the chance of deadlock.
|
|
failpoint.Inject("tryBlockCloseAnalysisPriorityQueue", func() {
|
|
rapidTicker := time.NewTicker(time.Millisecond * 10)
|
|
defer rapidTicker.Stop()
|
|
waitFor := time.After(time.Second * 5)
|
|
for {
|
|
select {
|
|
// Should exit after 5 seconds to avoid blocking forever.
|
|
case <-waitFor:
|
|
return
|
|
case <-rapidTicker.C:
|
|
pq.ProcessDMLChanges()
|
|
}
|
|
}
|
|
})
|
|
|
|
// Inject a panic point for testing.
|
|
failpoint.Inject("panicInAnalysisPriorityQueueRun", func() {
|
|
panic("panic injected in AnalysisPriorityQueue.run")
|
|
})
|
|
|
|
for {
|
|
// NOTE: We check the context error here to handle the case where the context has been canceled,
|
|
// allowing us to exit the goroutine as soon as possible.
|
|
if ctxErr := pq.ctx.Err(); ctxErr != nil {
|
|
statslogutil.StatsLogger().Info("Priority queue stopped", zap.Error(ctxErr))
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-pq.ctx.Done():
|
|
statslogutil.StatsLogger().Info("Priority queue stopped")
|
|
return
|
|
case <-dmlChangesFetchInterval.C:
|
|
queueSamplerLogger().Info("Start to fetch DML changes of tables")
|
|
pq.ProcessDMLChanges()
|
|
case <-timeRefreshInterval.C:
|
|
queueSamplerLogger().Info("Start to refresh last analysis durations of jobs")
|
|
pq.RefreshLastAnalysisDuration()
|
|
case <-mustRetryJobRequeueInterval.C:
|
|
queueSamplerLogger().Info("Start to requeue must retry jobs")
|
|
pq.RequeueMustRetryJobs()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ProcessDMLChanges processes DML changes.
|
|
// NOTE: This function is thread-safe.
|
|
// Performance: To scan all table stats and process the DML changes, it takes about less than 100ms for 1m tables.
|
|
// Exported for testing.
|
|
func (pq *AnalysisPriorityQueue) ProcessDMLChanges() {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
|
|
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
duration := time.Since(start)
|
|
if duration > slowLogThreshold {
|
|
queueSamplerLogger().Info("DML changes processed", zap.Duration("duration", duration))
|
|
}
|
|
}()
|
|
|
|
parameters := exec.GetAutoAnalyzeParameters(sctx)
|
|
// We need to fetch the next check version with offset before fetching new DML changes.
|
|
// Otherwise, we may miss some DML changes happened during the process.
|
|
newMaxVersion := pq.statsHandle.GetNextCheckVersionWithOffset()
|
|
// Query locked tables once to minimize overhead.
|
|
// Outdated lock info is acceptable as we verify table lock status pre-analysis.
|
|
lockedTables, err := lockstats.QueryLockedTables(statsutil.StatsCtx, sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
values := pq.statsHandle.Values()
|
|
lastFetchTimestamp := pq.syncFields.lastDMLUpdateFetchTimestamp
|
|
for _, value := range values {
|
|
// We only process the tables that have been updated.
|
|
// So here we only need to process the tables whose version is greater than the last fetch timestamp.
|
|
if value.Version > lastFetchTimestamp {
|
|
err := pq.processTableStats(sctx, value, parameters, lockedTables)
|
|
if err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error(
|
|
"Failed to process table stats",
|
|
zap.Error(err),
|
|
zap.Int64("tableID", value.PhysicalID),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Only update if we've seen a newer version
|
|
if newMaxVersion > lastFetchTimestamp {
|
|
queueSamplerLogger().Info("Updating last fetch timestamp", zap.Uint64("new_max_version", newMaxVersion))
|
|
pq.syncFields.lastDMLUpdateFetchTimestamp = newMaxVersion
|
|
}
|
|
return nil
|
|
}, statsutil.FlagWrapTxn); err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to process DML changes", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// NOTE: Please hold the lock before calling this function.
|
|
func (pq *AnalysisPriorityQueue) processTableStats(
|
|
sctx sessionctx.Context,
|
|
stats *statistics.Table,
|
|
parameters map[string]string,
|
|
lockedTables map[int64]struct{},
|
|
) error {
|
|
// Check if the table is eligible for analysis first to avoid unnecessary work.
|
|
if !stats.IsEligibleForAnalysis() {
|
|
return nil
|
|
}
|
|
|
|
autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[vardef.TiDBAutoAnalyzeRatio])
|
|
// Get current timestamp from the session context.
|
|
currentTs, err := statsutil.GetStartTS(sctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
|
|
is := sctx.GetLatestInfoSchema().(infoschema.InfoSchema)
|
|
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())
|
|
|
|
var job AnalysisJob
|
|
// For dynamic partitioned tables, we need to recreate the job if the partition stats are updated.
|
|
// This means we will always enter the tryCreateJob branch for these partitions.
|
|
// Since we store the stats meta for each partition and the parent table, there may be redundant calculations.
|
|
// This is acceptable for now, but in the future, we may consider separating the analysis job for each partition.
|
|
job, ok, _ := pq.syncFields.inner.getByKey(stats.PhysicalID)
|
|
if !ok {
|
|
job = pq.tryCreateJob(is, stats, pruneMode, jobFactory, lockedTables)
|
|
} else {
|
|
// Skip analysis if the table is locked.
|
|
// Dynamic partitioned tables are managed in the tryCreateJob branch.
|
|
// Non-partitioned tables can be skipped entirely here.
|
|
// For static partitioned tables, skip either the locked partition or the whole table if all partitions are locked.
|
|
// For dynamic partitioned tables, if the parent table is locked, we skip the whole table here as well.
|
|
if _, ok := lockedTables[stats.PhysicalID]; ok {
|
|
// Clean up the job if the table is locked.
|
|
err := pq.syncFields.inner.delete(job)
|
|
if err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error(
|
|
"Failed to delete job from priority queue",
|
|
zap.Error(err),
|
|
zap.String("job", job.String()),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
job = pq.tryUpdateJob(is, stats, job, jobFactory)
|
|
}
|
|
return pq.pushWithoutLock(job)
|
|
}
|
|
|
|
func (pq *AnalysisPriorityQueue) tryCreateJob(
|
|
is infoschema.InfoSchema,
|
|
stats *statistics.Table,
|
|
pruneMode variable.PartitionPruneMode,
|
|
jobFactory *AnalysisJobFactory,
|
|
lockedTables map[int64]struct{},
|
|
) (job AnalysisJob) {
|
|
if stats == nil {
|
|
return nil
|
|
}
|
|
|
|
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
|
|
if !ok {
|
|
statslogutil.StatsLogger().Warn(
|
|
"Table info not found for table id",
|
|
zap.Int64("tableID", stats.PhysicalID),
|
|
)
|
|
return nil
|
|
}
|
|
tableMeta := tableInfo.Meta()
|
|
partitionedTable := tableMeta.GetPartitionInfo()
|
|
if partitionedTable == nil {
|
|
// If the table is locked, we do not analyze it.
|
|
if _, ok := lockedTables[tableMeta.ID]; ok {
|
|
return nil
|
|
}
|
|
job = jobFactory.CreateNonPartitionedTableAnalysisJob(
|
|
tableMeta,
|
|
stats,
|
|
)
|
|
} else {
|
|
partitionDefs := partitionedTable.Definitions
|
|
if pruneMode == variable.Static {
|
|
var partitionDef model.PartitionDefinition
|
|
found := false
|
|
// Find the specific partition definition.
|
|
for _, def := range partitionDefs {
|
|
if def.ID == stats.PhysicalID {
|
|
partitionDef = def
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
// This usually indicates that the stats are for the parent (global) table.
|
|
// In static partition mode, we do not analyze the parent table.
|
|
// TODO: add tests to verify this behavior.
|
|
return nil
|
|
}
|
|
// If the partition is locked, we do not analyze it.
|
|
if _, ok := lockedTables[partitionDef.ID]; ok {
|
|
return nil
|
|
}
|
|
job = jobFactory.CreateStaticPartitionAnalysisJob(
|
|
tableMeta,
|
|
partitionDef.ID,
|
|
stats,
|
|
)
|
|
} else {
|
|
// If the table is locked, we do not analyze it.
|
|
// NOTE: the table meta is the parent table meta.
|
|
if _, ok := lockedTables[tableMeta.ID]; ok {
|
|
return nil
|
|
}
|
|
|
|
// Only analyze the partition that has not been locked.
|
|
// Special case for dynamic partitioned tables:
|
|
// 1. Initially, neither the table nor any partitions are locked.
|
|
// 2. Once partition p1 reaches the auto-analyze threshold, a job is created for the entire table.
|
|
// 3. At this point, partition p1 is locked.
|
|
// 4. There are no further partitions requiring analysis for this table because the only partition needing analysis is locked.
|
|
//
|
|
// Normally, we would remove the table's job in this scenario, but that is not handled here.
|
|
// The primary responsibility of this function is to create jobs for tables needing analysis,
|
|
// and deleting jobs falls outside its scope.
|
|
//
|
|
// This behavior is acceptable, as lock statuses will be validated before running the analysis.
|
|
// So let keep it simple and ignore this edge case here.
|
|
filteredPartitionDefs := make([]model.PartitionDefinition, 0, len(partitionDefs))
|
|
for _, def := range partitionDefs {
|
|
if _, ok := lockedTables[def.ID]; !ok {
|
|
filteredPartitionDefs = append(filteredPartitionDefs, def)
|
|
}
|
|
}
|
|
|
|
// Get global stats for dynamic partitioned table.
|
|
globalStats, found := pq.statsHandle.GetNonPseudoPhysicalTableStats(tableMeta.ID)
|
|
if !found {
|
|
return nil
|
|
}
|
|
partitionStats := GetPartitionStats(pq.statsHandle, filteredPartitionDefs)
|
|
job = jobFactory.CreateDynamicPartitionedTableAnalysisJob(
|
|
tableMeta,
|
|
globalStats,
|
|
partitionStats,
|
|
)
|
|
}
|
|
}
|
|
return job
|
|
}
|
|
|
|
func (pq *AnalysisPriorityQueue) tryUpdateJob(
|
|
is infoschema.InfoSchema,
|
|
stats *statistics.Table,
|
|
oldJob AnalysisJob,
|
|
jobFactory *AnalysisJobFactory,
|
|
) AnalysisJob {
|
|
if stats == nil {
|
|
return nil
|
|
}
|
|
intest.Assert(oldJob != nil)
|
|
indicators := oldJob.GetIndicators()
|
|
|
|
// For dynamic partitioned table, there is no way to only update the partition that has been changed.
|
|
// So we recreate the job for dynamic partitioned table.
|
|
if IsDynamicPartitionedTableAnalysisJob(oldJob) {
|
|
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
|
|
if !ok {
|
|
statslogutil.StatsLogger().Warn(
|
|
"Table info not found during updating job",
|
|
zap.Int64("tableID", stats.PhysicalID),
|
|
zap.String("job", oldJob.String()),
|
|
)
|
|
return nil
|
|
}
|
|
tableMeta := tableInfo.Meta()
|
|
partitionedTable := tableMeta.GetPartitionInfo()
|
|
partitionDefs := partitionedTable.Definitions
|
|
partitionStats := GetPartitionStats(pq.statsHandle, partitionDefs)
|
|
return jobFactory.CreateDynamicPartitionedTableAnalysisJob(
|
|
tableMeta,
|
|
stats,
|
|
partitionStats,
|
|
)
|
|
}
|
|
// Otherwise, we update the indicators of the job.
|
|
indicators.ChangePercentage = jobFactory.CalculateChangePercentage(stats)
|
|
indicators.TableSize = jobFactory.CalculateTableSize(stats)
|
|
oldJob.SetIndicators(indicators)
|
|
return oldJob
|
|
}
|
|
|
|
// RequeueMustRetryJobs requeues the must retry jobs.
|
|
// Exported for testing.
|
|
func (pq *AnalysisPriorityQueue) RequeueMustRetryJobs() {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
|
|
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
duration := time.Since(start)
|
|
if duration > slowLogThreshold {
|
|
queueSamplerLogger().Info("Must retry jobs requeued", zap.Duration("duration", duration))
|
|
}
|
|
}()
|
|
|
|
is := sctx.GetLatestInfoSchema().(infoschema.InfoSchema)
|
|
for tableID := range pq.syncFields.mustRetryJobs {
|
|
// NOTE: Delete the job first to ensure it can be added back to the queue
|
|
delete(pq.syncFields.mustRetryJobs, tableID)
|
|
tblInfo, ok := pq.statsHandle.TableInfoByID(is, tableID)
|
|
if !ok {
|
|
statslogutil.StatsLogger().Warn("Table info not found during requeueing must retry jobs", zap.Int64("tableID", tableID))
|
|
continue
|
|
}
|
|
err := pq.recreateAndPushJobForTable(sctx, tblInfo.Meta())
|
|
if err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to recreate and push job for table", zap.Error(err), zap.Int64("tableID", tableID))
|
|
continue
|
|
}
|
|
}
|
|
return nil
|
|
}, statsutil.FlagWrapTxn); err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to requeue must retry jobs", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// RefreshLastAnalysisDuration refreshes the last analysis duration of all jobs in the priority queue.
|
|
// NOTE: This function is thread-safe.
|
|
// Exported for testing
|
|
func (pq *AnalysisPriorityQueue) RefreshLastAnalysisDuration() {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
|
|
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
duration := time.Since(start)
|
|
if duration > slowLogThreshold {
|
|
queueSamplerLogger().Info("Last analysis duration refreshed", zap.Duration("duration", duration))
|
|
}
|
|
}()
|
|
jobs := pq.syncFields.inner.list()
|
|
currentTs, err := statsutil.GetStartTS(sctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
jobFactory := NewAnalysisJobFactory(sctx, 0, currentTs)
|
|
// TODO: We can directly rebuild the priority queue instead of updating the indicators of each job.
|
|
for _, job := range jobs {
|
|
indicators := job.GetIndicators()
|
|
tableStats, ok := pq.statsHandle.Get(job.GetTableID())
|
|
if !ok {
|
|
statslogutil.StatsLogger().Warn("Table stats not found during refreshing last analysis duration",
|
|
zap.Int64("tableID", job.GetTableID()),
|
|
zap.String("job", job.String()),
|
|
)
|
|
// Delete the job from the queue since its table is missing. This is a safeguard -
|
|
// DDL events should have already cleaned up jobs for dropped tables.
|
|
err := pq.syncFields.inner.delete(job)
|
|
if err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to delete job from priority queue",
|
|
zap.Error(err),
|
|
zap.String("job", job.String()),
|
|
)
|
|
}
|
|
continue
|
|
}
|
|
indicators.LastAnalysisDuration = jobFactory.GetTableLastAnalyzeDuration(tableStats)
|
|
job.SetIndicators(indicators)
|
|
job.SetWeight(pq.calculator.CalculateWeight(job))
|
|
if err := pq.syncFields.inner.update(job); err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to add job to priority queue",
|
|
zap.Error(err),
|
|
zap.String("job", job.String()),
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}, statsutil.FlagWrapTxn); err != nil {
|
|
statslogutil.StatsErrVerboseSampleLogger().Error("Failed to refresh last analysis duration", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
// GetRunningJobs returns the running jobs.
|
|
// NOTE: This function is thread-safe.
|
|
// Exported for testing.
|
|
func (pq *AnalysisPriorityQueue) GetRunningJobs() map[int64]struct{} {
|
|
pq.syncFields.mu.RLock()
|
|
defer pq.syncFields.mu.RUnlock()
|
|
|
|
runningJobs := make(map[int64]struct{}, len(pq.syncFields.runningJobs))
|
|
for id := range pq.syncFields.runningJobs {
|
|
runningJobs[id] = struct{}{}
|
|
}
|
|
return runningJobs
|
|
}
|
|
|
|
func (pq *AnalysisPriorityQueue) pushWithoutLock(job AnalysisJob) error {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
// Skip the must retry jobs.
|
|
// Avoiding requeueing the must retry jobs before the next must retry job requeue interval.
|
|
// Otherwise, we may requeue the same job multiple times in a short time.
|
|
if _, ok := pq.syncFields.mustRetryJobs[job.GetTableID()]; ok {
|
|
return nil
|
|
}
|
|
|
|
// Skip the current running jobs.
|
|
// Safety:
|
|
// Let's say we have a job in the priority queue, and it is already running.
|
|
// Then we will not add the same job to the priority queue again. Otherwise, we will analyze the same table twice.
|
|
// If the job is finished, we will remove it from the running jobs.
|
|
// Then the next time we process the DML changes, we will add the job to the priority queue.(if it is still needed)
|
|
// In this process, we will not miss any DML changes of the table. Because when we try to delete the table from the current running jobs,
|
|
// we guarantee that the job is finished and the stats cache is updated.(The last step of the analysis job is to update the stats cache).
|
|
if _, ok := pq.syncFields.runningJobs[job.GetTableID()]; ok {
|
|
// Mark the job as must retry.
|
|
// Because potentially the job can be analyzed in the near future.
|
|
// For example, the table has new indexes added when the job is running.
|
|
pq.syncFields.mustRetryJobs[job.GetTableID()] = struct{}{}
|
|
return nil
|
|
}
|
|
// We apply a penalty to larger tables, which can potentially result in a negative weight.
|
|
// To prevent this, we filter out any negative weights. Under normal circumstances, table sizes should not be negative.
|
|
weight := pq.calculator.CalculateWeight(job)
|
|
if weight <= 0 {
|
|
statslogutil.StatsSampleLogger().Warn(
|
|
"Table gets a negative weight",
|
|
zap.Float64("weight", weight),
|
|
zap.Stringer("job", job),
|
|
)
|
|
}
|
|
job.SetWeight(weight)
|
|
return pq.syncFields.inner.addOrUpdate(job)
|
|
}
|
|
|
|
// Pop pops a job from the priority queue and marks it as running.
|
|
// NOTE: This function is thread-safe.
|
|
func (pq *AnalysisPriorityQueue) Pop() (AnalysisJob, error) {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
if !pq.syncFields.initialized {
|
|
return nil, errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
job, err := pq.syncFields.inner.pop()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
pq.syncFields.runningJobs[job.GetTableID()] = struct{}{}
|
|
|
|
job.RegisterSuccessHook(func(j AnalysisJob) {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
// During owner switch, the priority queue is closed and its fields are reset to nil.
|
|
// We allow running jobs to complete normally rather than stopping them, so this nil
|
|
// check is expected when the job finishes after the switch.
|
|
if pq.syncFields.runningJobs == nil {
|
|
return
|
|
}
|
|
delete(pq.syncFields.runningJobs, j.GetTableID())
|
|
})
|
|
job.RegisterFailureHook(func(j AnalysisJob, needRetry bool) {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
// During owner switch, the priority queue is closed and its fields are reset to nil.
|
|
// We allow running jobs to complete normally rather than stopping them, so this nil check
|
|
// is expected when jobs finish after the switch. Failed jobs will be handled by the next
|
|
// initialization, so we can safely ignore them here.
|
|
if pq.syncFields.runningJobs == nil || pq.syncFields.mustRetryJobs == nil {
|
|
return
|
|
}
|
|
// Mark the job as failed and remove it from the running jobs.
|
|
delete(pq.syncFields.runningJobs, j.GetTableID())
|
|
if needRetry {
|
|
pq.syncFields.mustRetryJobs[j.GetTableID()] = struct{}{}
|
|
}
|
|
})
|
|
return job, nil
|
|
}
|
|
|
|
// PeekForTest peeks the top job from the priority queue.
|
|
// Exported for testing.
|
|
func (pq *AnalysisPriorityQueue) PeekForTest() (AnalysisJob, error) {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
if !pq.syncFields.initialized {
|
|
return nil, errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
return pq.syncFields.inner.peek()
|
|
}
|
|
|
|
// IsEmptyForTest checks whether the priority queue is empty.
|
|
// NOTE: This function is thread-safe.
|
|
// Exported for testing.
|
|
func (pq *AnalysisPriorityQueue) IsEmptyForTest() (bool, error) {
|
|
pq.syncFields.mu.RLock()
|
|
defer pq.syncFields.mu.RUnlock()
|
|
if !pq.syncFields.initialized {
|
|
return false, errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
return pq.syncFields.inner.isEmpty(), nil
|
|
}
|
|
|
|
// Len returns the number of jobs in the priority queue.
|
|
// NOTE: This function is thread-safe.
|
|
func (pq *AnalysisPriorityQueue) Len() (int, error) {
|
|
pq.syncFields.mu.RLock()
|
|
defer pq.syncFields.mu.RUnlock()
|
|
if !pq.syncFields.initialized {
|
|
return 0, errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
return pq.syncFields.inner.len(), nil
|
|
}
|
|
|
|
// Snapshot returns a snapshot of all the jobs in the priority queue.
|
|
func (pq *AnalysisPriorityQueue) Snapshot() (
|
|
snapshot statstypes.PriorityQueueSnapshot,
|
|
err error,
|
|
) {
|
|
pq.syncFields.mu.RLock()
|
|
defer pq.syncFields.mu.RUnlock()
|
|
if !pq.syncFields.initialized {
|
|
return statstypes.PriorityQueueSnapshot{}, errors.New(notInitializedErrMsg)
|
|
}
|
|
|
|
currentJobs := pq.syncFields.inner.list()
|
|
mustRetryTables := make([]int64, 0, len(pq.syncFields.mustRetryJobs))
|
|
for tableID := range pq.syncFields.mustRetryJobs {
|
|
mustRetryTables = append(mustRetryTables, tableID)
|
|
}
|
|
|
|
jsonJobs := make([]statstypes.AnalysisJobJSON, len(currentJobs))
|
|
for i, job := range currentJobs {
|
|
jsonJobs[i] = job.AsJSON()
|
|
}
|
|
// Sort by the weight in descending order.
|
|
sort.Slice(jsonJobs, func(i, j int) bool {
|
|
return jsonJobs[i].Weight > jsonJobs[j].Weight
|
|
})
|
|
|
|
return statstypes.PriorityQueueSnapshot{
|
|
CurrentJobs: jsonJobs,
|
|
MustRetryTables: mustRetryTables,
|
|
}, nil
|
|
}
|
|
|
|
// Close closes the priority queue.
|
|
// NOTE: This function is thread-safe.
|
|
// WARNING: Please make sure to avoid calling Close concurrently with Initialize to prevent potential concurrency issues.
|
|
func (pq *AnalysisPriorityQueue) Close() {
|
|
pq.syncFields.mu.Lock()
|
|
if !pq.syncFields.initialized {
|
|
pq.syncFields.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Check if the cancel function was set during initialization.
|
|
if pq.syncFields.cancel != nil {
|
|
pq.syncFields.cancel()
|
|
}
|
|
pq.syncFields.mu.Unlock()
|
|
|
|
// NOTE: We should wait outside the lock to avoid deadlock.
|
|
pq.wg.Wait()
|
|
}
|
|
|
|
// resetSyncFields resets the synchronized fields of the priority queue.
|
|
func (pq *AnalysisPriorityQueue) resetSyncFields() {
|
|
pq.syncFields.mu.Lock()
|
|
defer pq.syncFields.mu.Unlock()
|
|
// Reset the initialized flag to allow the priority queue to be closed and re-initialized.
|
|
pq.syncFields.initialized = false
|
|
// The rest fields will be reset when the priority queue is initialized.
|
|
// But we do it here for double safety.
|
|
pq.syncFields.inner = nil
|
|
pq.syncFields.runningJobs = nil
|
|
pq.syncFields.mustRetryJobs = nil
|
|
pq.syncFields.lastDMLUpdateFetchTimestamp = 0
|
|
pq.syncFields.cancel = nil
|
|
}
|