Files
tidb/pkg/domain/affinity/manager.go

170 lines
4.5 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package affinity
import (
"context"
"sync"
pdhttp "github.com/tikv/pd/client/http"
)
// Manager manages affinity groups with PD.
type Manager interface {
CreateAffinityGroupsIfNotExists(ctx context.Context, groups map[string][]pdhttp.AffinityGroupKeyRange) error
DeleteAffinityGroups(ctx context.Context, ids []string) error
GetAffinityGroups(ctx context.Context, ids []string) (map[string]*pdhttp.AffinityGroupState, error)
}
type pdManager struct {
pdhttp.Client
}
// NewPDManager creates a new affinity manager that uses PD HTTP client.
func NewPDManager(client pdhttp.Client) Manager {
return &pdManager{client}
}
// CreateAffinityGroupsIfNotExists creates affinity groups in PD.
// It checks which groups already exist and only creates the ones that don't exist.
// This makes the operation safe for DDL job retries.
func (m *pdManager) CreateAffinityGroupsIfNotExists(ctx context.Context, groups map[string][]pdhttp.AffinityGroupKeyRange) error {
if len(groups) == 0 {
return nil
}
// Collect group IDs to check
groupIDs := make([]string, 0, len(groups))
for id := range groups {
groupIDs = append(groupIDs, id)
}
// TODO: move check to PD
// Check which groups already exist
existingGroups, err := m.GetAffinityGroups(ctx, groupIDs)
if err != nil {
return err
}
// Filter out groups that already exist
groupsToCreate := make(map[string][]pdhttp.AffinityGroupKeyRange)
for id, ranges := range groups {
if _, exists := existingGroups[id]; !exists {
groupsToCreate[id] = ranges
}
}
// If all groups already exist, return success
if len(groupsToCreate) == 0 {
return nil
}
// Create only the groups that don't exist
_, err = m.Client.CreateAffinityGroups(ctx, groupsToCreate)
return err
}
// DeleteAffinityGroups deletes affinity groups in PD (force=true).
func (m *pdManager) DeleteAffinityGroups(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
return m.Client.BatchDeleteAffinityGroups(ctx, ids, true)
}
// GetAffinityGroups gets affinity groups from PD.
func (m *pdManager) GetAffinityGroups(ctx context.Context, ids []string) (map[string]*pdhttp.AffinityGroupState, error) {
if len(ids) == 0 {
return make(map[string]*pdhttp.AffinityGroupState), nil
}
// TODO: avoid using GetAllAffinityGroups
allGroups, err := m.Client.GetAllAffinityGroups(ctx)
if err != nil {
return nil, err
}
// Filter by requested IDs
result := make(map[string]*pdhttp.AffinityGroupState)
for _, id := range ids {
if group, ok := allGroups[id]; ok {
result[id] = group
}
}
return result, nil
}
type mockManager struct {
sync.RWMutex
groups map[string]*pdhttp.AffinityGroupState
}
// NewMockManager creates a mock affinity manager for testing.
func NewMockManager() Manager {
return &mockManager{
groups: make(map[string]*pdhttp.AffinityGroupState),
}
}
func (m *mockManager) CreateAffinityGroupsIfNotExists(_ context.Context, groups map[string][]pdhttp.AffinityGroupKeyRange) error {
if len(groups) == 0 {
return nil
}
m.Lock()
defer m.Unlock()
if m.groups == nil {
m.groups = make(map[string]*pdhttp.AffinityGroupState)
}
// Idempotent: only create groups that don't already exist
for id, ranges := range groups {
if _, exists := m.groups[id]; !exists {
m.groups[id] = &pdhttp.AffinityGroupState{
AffinityGroup: pdhttp.AffinityGroup{
ID: id,
},
RangeCount: len(ranges),
}
}
}
return nil
}
func (m *mockManager) DeleteAffinityGroups(_ context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
m.Lock()
for _, id := range ids {
delete(m.groups, id)
}
m.Unlock()
return nil
}
func (m *mockManager) GetAffinityGroups(_ context.Context, ids []string) (map[string]*pdhttp.AffinityGroupState, error) {
m.RLock()
defer m.RUnlock()
result := make(map[string]*pdhttp.AffinityGroupState)
for _, id := range ids {
if state, ok := m.groups[id]; ok {
result[id] = state
}
}
return result, nil
}