116 lines
3.4 KiB
Go
116 lines
3.4 KiB
Go
// Copyright 2025 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package affinity
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
pdhttp "github.com/tikv/pd/client/http"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
// maxRetryTimes is the max retry times for affinity group operations
|
|
maxRetryTimes = 3
|
|
// retryInterval is the sleep time before next retry
|
|
retryInterval = 200 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
manager Manager
|
|
pdClient pdhttp.Client
|
|
)
|
|
|
|
// InitManager initializes the package-level affinity manager and PD client.
|
|
// This is called by infosync during initialization.
|
|
func InitManager(pdCli pdhttp.Client) {
|
|
if pdCli == nil {
|
|
manager = NewMockManager()
|
|
} else {
|
|
manager = NewPDManager(pdCli)
|
|
}
|
|
pdClient = pdCli
|
|
}
|
|
|
|
// CreateGroupsIfNotExists creates affinity groups in PD (idempotent).
|
|
// It checks which groups already exist and only creates the ones that don't exist.
|
|
// This makes the operation safe for DDL job retries (e.g., after Owner switch or network failures).
|
|
func CreateGroupsIfNotExists(ctx context.Context, groups map[string][]pdhttp.AffinityGroupKeyRange) error {
|
|
if len(groups) == 0 {
|
|
return nil
|
|
}
|
|
return manager.CreateAffinityGroupsIfNotExists(ctx, groups)
|
|
}
|
|
|
|
// DeleteGroups removes affinity groups in PD with force=true.
|
|
func DeleteGroups(ctx context.Context, ids []string) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
return manager.DeleteAffinityGroups(ctx, ids)
|
|
}
|
|
|
|
// DeleteGroupsWithRetry deletes groups with retry logic.
|
|
// This is a best-effort cleanup operation. Errors are logged on final failure.
|
|
func DeleteGroupsWithRetry(ctx context.Context, ids []string) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var err error
|
|
for i := 0; i <= maxRetryTimes; i++ {
|
|
err = DeleteGroups(ctx, ids)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if i != maxRetryTimes {
|
|
time.Sleep(retryInterval)
|
|
} else {
|
|
// Only log error on final retry failure
|
|
logutil.BgLogger().Error("Failed to delete affinity groups after retries", zap.Error(err), zap.Strings("groupIDs", ids))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// GetGroups gets affinity groups by their IDs.
|
|
func GetGroups(ctx context.Context, ids []string) (map[string]*pdhttp.AffinityGroupState, error) {
|
|
if len(ids) == 0 {
|
|
return make(map[string]*pdhttp.AffinityGroupState), nil
|
|
}
|
|
return manager.GetAffinityGroups(ctx, ids)
|
|
}
|
|
|
|
// GetAllGroupStates gets all affinity group states from PD.
|
|
// This is used by SHOW AFFINITY to directly query PD without going through the manager.
|
|
func GetAllGroupStates(ctx context.Context) (map[string]*pdhttp.AffinityGroupState, error) {
|
|
if pdClient == nil {
|
|
return make(map[string]*pdhttp.AffinityGroupState), nil
|
|
}
|
|
return pdClient.GetAllAffinityGroups(ctx)
|
|
}
|
|
|
|
// SetPDClientForTest sets the PD client for testing.
|
|
// Please do not use it in the production environment.
|
|
func SetPDClientForTest(cli pdhttp.Client) func() {
|
|
originalCli := pdClient
|
|
pdClient = cli
|
|
return func() {
|
|
pdClient = originalCli
|
|
}
|
|
}
|