881 lines
29 KiB
Go
881 lines
29 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package pdutil
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/coreos/go-semver/semver"
|
|
"github.com/docker/go-units"
|
|
"github.com/google/uuid"
|
|
"github.com/opentracing/opentracing-go"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
pd "github.com/tikv/pd/client"
|
|
pdhttp "github.com/tikv/pd/client/http"
|
|
"github.com/tikv/pd/client/opt"
|
|
"github.com/tikv/pd/client/pkg/caller"
|
|
"github.com/tikv/pd/client/pkg/retry"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
maxMsgSize = int(128 * units.MiB) // pd.ScanRegion may return a large response
|
|
pauseTimeout = 5 * time.Minute
|
|
// pd request retry time when connection fail
|
|
PDRequestRetryTime = 120
|
|
// set max-pending-peer-count to a large value to avoid scatter region failed.
|
|
maxPendingPeerUnlimited uint64 = math.MaxInt32
|
|
)
|
|
|
|
// pauseConfigGenerator generate a config value according to store count and current value.
|
|
type pauseConfigGenerator func(int, any) any
|
|
|
|
// zeroPauseConfig sets the config to 0.
|
|
func zeroPauseConfig(int, any) any {
|
|
return 0
|
|
}
|
|
|
|
// pauseConfigMulStores multiplies the existing value by
|
|
// number of stores. The value is limited to 40, as larger value
|
|
// may make the cluster unstable.
|
|
func pauseConfigMulStores(stores int, raw any) any {
|
|
rawCfg := raw.(float64)
|
|
return math.Min(40, rawCfg*float64(stores))
|
|
}
|
|
|
|
// pauseConfigFalse sets the config to "false".
|
|
func pauseConfigFalse(int, any) any {
|
|
return "false"
|
|
}
|
|
|
|
// constConfigGeneratorBuilder build a pauseConfigGenerator based on a given const value.
|
|
func constConfigGeneratorBuilder(val any) pauseConfigGenerator {
|
|
return func(int, any) any {
|
|
return val
|
|
}
|
|
}
|
|
|
|
// ClusterConfig represents a set of scheduler whose config have been modified
|
|
// along with their original config.
|
|
type ClusterConfig struct {
|
|
// Enable PD schedulers before restore
|
|
Schedulers []string `json:"schedulers"`
|
|
// Original scheudle configuration
|
|
ScheduleCfg map[string]any `json:"schedule_cfg"`
|
|
// The region rule ID registered
|
|
RuleID string `json:"rule_id"`
|
|
}
|
|
|
|
type pauseSchedulerBody struct {
|
|
Delay int64 `json:"delay"`
|
|
}
|
|
|
|
var (
|
|
// in v4.0.8 version we can use pause configs
|
|
// see https://github.com/tikv/pd/pull/3088
|
|
pauseConfigVersion = semver.Version{Major: 4, Minor: 0, Patch: 8}
|
|
|
|
// After v6.1.0 version, we can pause schedulers by key range with TTL.
|
|
minVersionForRegionLabelTTL = semver.Version{Major: 6, Minor: 1, Patch: 0}
|
|
|
|
// Schedulers represent region/leader schedulers which can impact on performance.
|
|
Schedulers = map[string]struct{}{
|
|
"balance-leader-scheduler": {},
|
|
"balance-hot-region-scheduler": {},
|
|
"balance-region-scheduler": {},
|
|
|
|
"shuffle-leader-scheduler": {},
|
|
"shuffle-region-scheduler": {},
|
|
"shuffle-hot-region-scheduler": {},
|
|
|
|
"evict-slow-store-scheduler": {},
|
|
}
|
|
expectPDCfgGenerators = map[string]pauseConfigGenerator{
|
|
"merge-schedule-limit": zeroPauseConfig,
|
|
// TODO "leader-schedule-limit" and "region-schedule-limit" don't support ttl for now,
|
|
// but we still need set these config for compatible with old version.
|
|
// we need wait for https://github.com/tikv/pd/pull/3131 merged.
|
|
// see details https://github.com/pingcap/br/pull/592#discussion_r522684325
|
|
"leader-schedule-limit": pauseConfigMulStores,
|
|
"region-schedule-limit": pauseConfigMulStores,
|
|
"max-snapshot-count": pauseConfigMulStores,
|
|
"enable-location-replacement": pauseConfigFalse,
|
|
"max-pending-peer-count": constConfigGeneratorBuilder(maxPendingPeerUnlimited),
|
|
}
|
|
|
|
// defaultPDCfg find by https://github.com/tikv/pd/blob/master/conf/config.toml.
|
|
// only use for debug command.
|
|
defaultPDCfg = map[string]any{
|
|
"merge-schedule-limit": 8,
|
|
"leader-schedule-limit": 4,
|
|
"region-schedule-limit": 2048,
|
|
"enable-location-replacement": "true",
|
|
}
|
|
)
|
|
|
|
// DefaultExpectPDCfgGenerators returns default pd config generators
|
|
func DefaultExpectPDCfgGenerators() map[string]pauseConfigGenerator {
|
|
clone := make(map[string]pauseConfigGenerator, len(expectPDCfgGenerators))
|
|
for k := range expectPDCfgGenerators {
|
|
clone[k] = expectPDCfgGenerators[k]
|
|
}
|
|
return clone
|
|
}
|
|
|
|
// PdController manage get/update config from pd.
|
|
type PdController struct {
|
|
pdClient pd.Client
|
|
pdHTTPCli pdhttp.Client
|
|
version *semver.Version
|
|
|
|
// control the pause schedulers goroutine
|
|
schedulerPauseCh chan struct{}
|
|
// control the ttl of pausing schedulers
|
|
SchedulerPauseTTL time.Duration
|
|
}
|
|
|
|
// NewPdController creates a new PdController.
|
|
func NewPdController(
|
|
ctx context.Context,
|
|
pdAddrs []string,
|
|
tlsConf *tls.Config,
|
|
securityOption pd.SecurityOption,
|
|
) (*PdController, error) {
|
|
maxCallMsgSize := []grpc.DialOption{
|
|
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
|
|
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)),
|
|
}
|
|
pdClient, err := pd.NewClientWithContext(
|
|
ctx, caller.Component("br-pd-controller"), pdAddrs, securityOption,
|
|
opt.WithGRPCDialOptions(maxCallMsgSize...),
|
|
// If the time too short, we may scatter a region many times, because
|
|
// the interface `ScatterRegions` may time out.
|
|
opt.WithCustomTimeoutOption(60*time.Second),
|
|
)
|
|
if err != nil {
|
|
log.Error("fail to create pd client", zap.Error(err))
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
pdHTTPCliConfig := make([]pdhttp.ClientOption, 0, 1)
|
|
if tlsConf != nil {
|
|
pdHTTPCliConfig = append(pdHTTPCliConfig, pdhttp.WithTLSConfig(tlsConf))
|
|
}
|
|
pdHTTPCli := pdhttp.NewClientWithServiceDiscovery(
|
|
"br/lightning PD controller",
|
|
pdClient.GetServiceDiscovery(),
|
|
pdHTTPCliConfig...,
|
|
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, PDRequestRetryTime*time.Second))
|
|
versionStr, err := pdHTTPCli.GetPDVersion(ctx)
|
|
if err != nil {
|
|
pdHTTPCli.Close()
|
|
pdClient.Close()
|
|
return nil, errors.Trace(err)
|
|
}
|
|
version := parseVersion(versionStr)
|
|
|
|
return &PdController{
|
|
pdClient: pdClient,
|
|
pdHTTPCli: pdHTTPCli,
|
|
version: version,
|
|
// We should make a buffered channel here otherwise when context canceled,
|
|
// gracefully shutdown will stick at resuming schedulers.
|
|
schedulerPauseCh: make(chan struct{}, 1),
|
|
}, nil
|
|
}
|
|
|
|
func NewPdControllerWithPDClient(pdClient pd.Client, pdHTTPCli pdhttp.Client, v *semver.Version) *PdController {
|
|
return &PdController{
|
|
pdClient: pdClient,
|
|
pdHTTPCli: pdHTTPCli,
|
|
version: v,
|
|
schedulerPauseCh: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
func parseVersion(versionStr string) *semver.Version {
|
|
// we need trim space or semver will parse failed
|
|
v := strings.TrimSpace(versionStr)
|
|
v = strings.Trim(v, "\"")
|
|
v = strings.TrimPrefix(v, "v")
|
|
version, err := semver.NewVersion(v)
|
|
if err != nil {
|
|
log.Warn("fail back to v0.0.0 version",
|
|
zap.String("version", versionStr), zap.Error(err))
|
|
version = &semver.Version{Major: 0, Minor: 0, Patch: 0}
|
|
}
|
|
failpoint.Inject("PDEnabledPauseConfig", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
// test pause config is enable
|
|
version = &semver.Version{Major: 5, Minor: 0, Patch: 0}
|
|
}
|
|
})
|
|
return version
|
|
}
|
|
|
|
func (p *PdController) isPauseConfigEnabled() bool {
|
|
return p.version.Compare(pauseConfigVersion) >= 0
|
|
}
|
|
|
|
// SetPDClient set pd addrs and cli for test.
|
|
func (p *PdController) SetPDClient(pdClient pd.Client) {
|
|
p.pdClient = pdClient
|
|
}
|
|
|
|
// GetPDClient set pd addrs and cli for test.
|
|
func (p *PdController) GetPDClient() pd.Client {
|
|
return p.pdClient
|
|
}
|
|
|
|
// GetPDHTTPClient returns the pd http client.
|
|
func (p *PdController) GetPDHTTPClient() pdhttp.Client {
|
|
return p.pdHTTPCli
|
|
}
|
|
|
|
// GetClusterVersion returns the current cluster version.
|
|
func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) {
|
|
v, err := p.pdHTTPCli.GetClusterVersion(ctx)
|
|
return v, errors.Trace(err)
|
|
}
|
|
|
|
// GetRegionCount returns the region count in the specified range.
|
|
func (p *PdController) GetRegionCount(ctx context.Context, startKey, endKey []byte) (int, error) {
|
|
// TiKV reports region start/end keys to PD in memcomparable-format.
|
|
var start, end []byte
|
|
start = codec.EncodeBytes(nil, startKey)
|
|
if len(endKey) != 0 { // Empty end key means the max.
|
|
end = codec.EncodeBytes(nil, endKey)
|
|
}
|
|
status, err := p.pdHTTPCli.GetRegionStatusByKeyRange(ctx, pdhttp.NewKeyRange(start, end), true)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
return status.Count, nil
|
|
}
|
|
|
|
// GetStoreInfo returns the info of store with the specified id.
|
|
func (p *PdController) GetStoreInfo(ctx context.Context, storeID uint64) (*pdhttp.StoreInfo, error) {
|
|
info, err := p.pdHTTPCli.GetStore(ctx, storeID)
|
|
return info, errors.Trace(err)
|
|
}
|
|
|
|
func (p *PdController) doPauseSchedulers(
|
|
ctx context.Context,
|
|
schedulers []string,
|
|
) ([]string, error) {
|
|
// pause this scheduler with 300 seconds
|
|
delay := int64(p.ttlOfPausing().Seconds())
|
|
removedSchedulers := make([]string, 0, len(schedulers))
|
|
for _, scheduler := range schedulers {
|
|
err := p.pdHTTPCli.SetSchedulerDelay(ctx, scheduler, delay)
|
|
if err != nil {
|
|
return removedSchedulers, errors.Trace(err)
|
|
}
|
|
removedSchedulers = append(removedSchedulers, scheduler)
|
|
}
|
|
return removedSchedulers, nil
|
|
}
|
|
|
|
func (p *PdController) pauseSchedulersAndConfigWith(
|
|
ctx context.Context, schedulers []string,
|
|
schedulerCfg map[string]any,
|
|
) ([]string, error) {
|
|
// first pause this scheduler, if the first time failed. we should return the error
|
|
// so put first time out of for loop. and in for loop we could ignore other failed pause.
|
|
removedSchedulers, err := p.doPauseSchedulers(ctx, schedulers)
|
|
if err != nil {
|
|
log.Error("failed to pause scheduler at beginning",
|
|
zap.Strings("name", schedulers), zap.Error(err))
|
|
return nil, errors.Trace(err)
|
|
}
|
|
log.Info("pause scheduler successful at beginning", zap.Strings("name", schedulers))
|
|
if schedulerCfg != nil {
|
|
err = p.doPauseConfigs(ctx, schedulerCfg)
|
|
if err != nil {
|
|
log.Error("failed to pause config at beginning",
|
|
zap.Any("cfg", schedulerCfg), zap.Error(err))
|
|
return nil, errors.Trace(err)
|
|
}
|
|
log.Info("pause configs successful at beginning", zap.Any("cfg", schedulerCfg))
|
|
}
|
|
|
|
go func() {
|
|
tick := time.NewTicker(p.ttlOfPausing() / 3)
|
|
defer tick.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
_, err := p.doPauseSchedulers(ctx, schedulers)
|
|
if err != nil {
|
|
log.Warn("pause scheduler failed, ignore it and wait next time pause", zap.Error(err))
|
|
}
|
|
if schedulerCfg != nil {
|
|
err = p.doPauseConfigs(ctx, schedulerCfg)
|
|
if err != nil {
|
|
log.Warn("pause configs failed, ignore it and wait next time pause", zap.Error(err))
|
|
}
|
|
}
|
|
log.Info("pause scheduler(configs)", zap.Strings("name", removedSchedulers),
|
|
zap.Any("cfg", schedulerCfg))
|
|
case <-p.schedulerPauseCh:
|
|
log.Info("exit pause scheduler and configs successful")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return removedSchedulers, nil
|
|
}
|
|
|
|
// ResumeSchedulers resume pd scheduler.
|
|
func (p *PdController) ResumeSchedulers(ctx context.Context, schedulers []string) error {
|
|
return errors.Trace(p.resumeSchedulerWith(ctx, schedulers))
|
|
}
|
|
|
|
func (p *PdController) ResumeRegionLabelRule(ctx context.Context, ruleID string) {
|
|
if ruleID == "" {
|
|
return
|
|
}
|
|
ruleRet, err := p.pdHTTPCli.GetRegionLabelRulesByIDs(ctx, []string{ruleID})
|
|
if err != nil || len(ruleRet) == 0 {
|
|
log.Warn("failed to get the region label rule, the rule may have been removed", zap.String("rule-id", ruleID))
|
|
return
|
|
}
|
|
rule := ruleRet[0]
|
|
// Set ttl to 0 to remove the rule.
|
|
rule.Labels[0].TTL = time.Duration(0).String()
|
|
deleteRule := &pdhttp.LabelRulePatch{DeleteRules: []string{ruleID}}
|
|
if err := p.pdHTTPCli.PatchRegionLabelRules(ctx, deleteRule); err != nil {
|
|
log.Warn("failed to delete region label rule, the rule will be removed after ttl expires",
|
|
zap.String("rule-id", rule.ID), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (p *PdController) resumeSchedulerWith(ctx context.Context, schedulers []string) (err error) {
|
|
if len(schedulers) == 0 {
|
|
return nil
|
|
}
|
|
log.Info("resume scheduler", zap.Strings("schedulers", schedulers))
|
|
p.schedulerPauseCh <- struct{}{}
|
|
|
|
// 0 means stop pause.
|
|
delay := int64(0)
|
|
for _, scheduler := range schedulers {
|
|
err = p.pdHTTPCli.SetSchedulerDelay(ctx, scheduler, delay)
|
|
if err != nil {
|
|
log.Error("failed to resume scheduler after retry, you may reset this scheduler manually"+
|
|
"or just wait this scheduler pause timeout", zap.String("scheduler", scheduler))
|
|
} else {
|
|
log.Info("resume scheduler successful", zap.String("scheduler", scheduler))
|
|
}
|
|
}
|
|
// no need to return error, because the pause will timeout.
|
|
return nil
|
|
}
|
|
|
|
// ListSchedulers list all pd scheduler.
|
|
func (p *PdController) ListSchedulers(ctx context.Context) ([]string, error) {
|
|
s, err := p.pdHTTPCli.GetSchedulers(ctx)
|
|
return s, errors.Trace(err)
|
|
}
|
|
|
|
// GetPDScheduleConfig returns PD schedule config value associated with the key.
|
|
// It returns nil if there is no such config item.
|
|
func (p *PdController) GetPDScheduleConfig(ctx context.Context) (map[string]any, error) {
|
|
cfg, err := p.pdHTTPCli.GetScheduleConfig(ctx)
|
|
return cfg, errors.Trace(err)
|
|
}
|
|
|
|
// UpdatePDScheduleConfig updates PD schedule config value associated with the key.
|
|
func (p *PdController) UpdatePDScheduleConfig(ctx context.Context) error {
|
|
log.Info("update pd with default config", zap.Any("cfg", defaultPDCfg))
|
|
return errors.Trace(p.doUpdatePDScheduleConfig(ctx, defaultPDCfg))
|
|
}
|
|
|
|
func (p *PdController) doUpdatePDScheduleConfig(
|
|
ctx context.Context, cfg map[string]any, ttlSeconds ...float64,
|
|
) error {
|
|
newCfg := make(map[string]any)
|
|
for k, v := range cfg {
|
|
// if we want use ttl, we need use config prefix first.
|
|
// which means cfg should transfer from "max-merge-region-keys" to "schedule.max-merge-region-keys".
|
|
sc := fmt.Sprintf("schedule.%s", k)
|
|
newCfg[sc] = v
|
|
}
|
|
|
|
if err := p.pdHTTPCli.SetConfig(ctx, newCfg, ttlSeconds...); err != nil {
|
|
return errors.Annotatef(
|
|
berrors.ErrPDUpdateFailed,
|
|
"failed to update PD schedule config: %s",
|
|
err.Error(),
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]any) error {
|
|
// pause this scheduler with 300 seconds
|
|
return errors.Trace(p.doUpdatePDScheduleConfig(ctx, cfg, p.ttlOfPausing().Seconds()))
|
|
}
|
|
|
|
func restoreSchedulers(ctx context.Context, pd *PdController, clusterCfg ClusterConfig,
|
|
configsNeedRestore map[string]pauseConfigGenerator) error {
|
|
if err := pd.ResumeSchedulers(ctx, clusterCfg.Schedulers); err != nil {
|
|
return errors.Annotate(err, "fail to add PD schedulers")
|
|
}
|
|
pd.ResumeRegionLabelRule(ctx, clusterCfg.RuleID)
|
|
log.Info("restoring config", zap.Any("config", clusterCfg.ScheduleCfg))
|
|
mergeCfg := make(map[string]any)
|
|
for cfgKey := range configsNeedRestore {
|
|
value := clusterCfg.ScheduleCfg[cfgKey]
|
|
if value == nil {
|
|
// Ignore non-exist config.
|
|
continue
|
|
}
|
|
mergeCfg[cfgKey] = value
|
|
}
|
|
|
|
prefix := make([]float64, 0, 1)
|
|
if pd.isPauseConfigEnabled() {
|
|
// set config's ttl to zero, make temporary config invalid immediately.
|
|
prefix = append(prefix, 0)
|
|
}
|
|
// reset config with previous value.
|
|
if err := pd.doUpdatePDScheduleConfig(ctx, mergeCfg, prefix...); err != nil {
|
|
return errors.Annotate(err, "fail to update PD merge config")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MakeUndoFunctionByConfig return an UndoFunc based on specified ClusterConfig
|
|
func (p *PdController) MakeUndoFunctionByConfig(config ClusterConfig) UndoFunc {
|
|
return p.GenRestoreSchedulerFunc(config, expectPDCfgGenerators)
|
|
}
|
|
|
|
func (p *PdController) MakeFineGrainedUndoFunction(config ClusterConfig, undoFunc func()) UndoFunc {
|
|
restore := func(ctx context.Context) error {
|
|
undoFunc()
|
|
return restoreSchedulers(ctx, p, config, expectPDCfgGenerators)
|
|
}
|
|
return restore
|
|
}
|
|
|
|
// GenRestoreSchedulerFunc gen restore func
|
|
func (p *PdController) GenRestoreSchedulerFunc(config ClusterConfig,
|
|
configsNeedRestore map[string]pauseConfigGenerator) UndoFunc {
|
|
// todo: we only need config names, not a map[string]pauseConfigGenerator
|
|
restore := func(ctx context.Context) error {
|
|
return restoreSchedulers(ctx, p, config, configsNeedRestore)
|
|
}
|
|
return restore
|
|
}
|
|
|
|
// RemoveSchedulers removes the schedulers that may slow down BR speed.
|
|
func (p *PdController) RemoveSchedulers(ctx context.Context) (undo UndoFunc, err error) {
|
|
undo = Nop
|
|
|
|
origin, _, err1 := p.RemoveSchedulersWithOrigin(ctx)
|
|
if err1 != nil {
|
|
err = err1
|
|
return
|
|
}
|
|
|
|
undo = p.MakeUndoFunctionByConfig(ClusterConfig{Schedulers: origin.Schedulers, ScheduleCfg: origin.ScheduleCfg})
|
|
return undo, errors.Trace(err)
|
|
}
|
|
|
|
// RemoveSchedulersWithConfig removes the schedulers that may slow down BR speed.
|
|
func (p *PdController) RemoveSchedulersWithConfig(
|
|
ctx context.Context,
|
|
) (undo UndoFunc, config *ClusterConfig, err error) {
|
|
undo = Nop
|
|
|
|
origin, _, err1 := p.RemoveSchedulersWithOrigin(ctx)
|
|
if err1 != nil {
|
|
err = err1
|
|
return
|
|
}
|
|
|
|
undo = p.MakeUndoFunctionByConfig(
|
|
ClusterConfig{Schedulers: origin.Schedulers, ScheduleCfg: origin.ScheduleCfg, RuleID: ""},
|
|
)
|
|
return undo, &origin, errors.Trace(err)
|
|
}
|
|
|
|
// RemoveAllPDSchedulers pause pd scheduler during the snapshot backup and restore
|
|
func (p *PdController) RemoveAllPDSchedulers(ctx context.Context) (undo UndoFunc, err error) {
|
|
undo = Nop
|
|
|
|
// during the backup, we shall stop all scheduler so that restore easy to implement
|
|
// during phase-2, pd is fresh and in recovering-mode(recovering-mark=true), there's no leader
|
|
// so there's no leader or region schedule initially. when phase-2 start force setting leaders, schedule may begin.
|
|
// we don't want pd do any leader or region schedule during this time, so we set those params to 0
|
|
// before we force setting leaders
|
|
const enableTiKVSplitRegion = "enable-tikv-split-region"
|
|
scheduleLimitParams := []string{
|
|
"hot-region-schedule-limit",
|
|
"leader-schedule-limit",
|
|
"merge-schedule-limit",
|
|
"region-schedule-limit",
|
|
"replica-schedule-limit",
|
|
enableTiKVSplitRegion,
|
|
}
|
|
pdConfigGenerators := DefaultExpectPDCfgGenerators()
|
|
for _, param := range scheduleLimitParams {
|
|
if param == enableTiKVSplitRegion {
|
|
pdConfigGenerators[param] = func(int, any) any { return false }
|
|
} else {
|
|
pdConfigGenerators[param] = func(int, any) any { return 0 }
|
|
}
|
|
}
|
|
|
|
oldPDConfig, _, err1 := p.RemoveSchedulersWithConfigGenerator(ctx, pdConfigGenerators)
|
|
if err1 != nil {
|
|
err = err1
|
|
return
|
|
}
|
|
|
|
undo = p.GenRestoreSchedulerFunc(oldPDConfig, pdConfigGenerators)
|
|
return undo, errors.Trace(err)
|
|
}
|
|
|
|
// RemoveSchedulersWithOrigin pause and remove br related schedule configs and return the origin and modified configs
|
|
func (p *PdController) RemoveSchedulersWithOrigin(ctx context.Context) (
|
|
origin ClusterConfig,
|
|
modified ClusterConfig,
|
|
err error,
|
|
) {
|
|
origin, modified, err = p.RemoveSchedulersWithConfigGenerator(ctx, expectPDCfgGenerators)
|
|
err = errors.Trace(err)
|
|
return
|
|
}
|
|
|
|
// RemoveSchedulersWithConfigGenerator pause scheduler with custom config generator
|
|
func (p *PdController) RemoveSchedulersWithConfigGenerator(
|
|
ctx context.Context,
|
|
pdConfigGenerators map[string]pauseConfigGenerator,
|
|
) (origin ClusterConfig, modified ClusterConfig, err error) {
|
|
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
|
span1 := span.Tracer().StartSpan("PdController.RemoveSchedulers",
|
|
opentracing.ChildOf(span.Context()))
|
|
defer span1.Finish()
|
|
ctx = opentracing.ContextWithSpan(ctx, span1)
|
|
}
|
|
|
|
originCfg := ClusterConfig{}
|
|
removedCfg := ClusterConfig{}
|
|
stores, err := p.pdClient.GetAllStores(ctx)
|
|
if err != nil {
|
|
return originCfg, removedCfg, errors.Trace(err)
|
|
}
|
|
scheduleCfg, err := p.GetPDScheduleConfig(ctx)
|
|
if err != nil {
|
|
return originCfg, removedCfg, errors.Trace(err)
|
|
}
|
|
disablePDCfg := make(map[string]any, len(pdConfigGenerators))
|
|
originPDCfg := make(map[string]any, len(pdConfigGenerators))
|
|
for cfgKey, cfgValFunc := range pdConfigGenerators {
|
|
value, ok := scheduleCfg[cfgKey]
|
|
if !ok {
|
|
// Ignore non-exist config.
|
|
continue
|
|
}
|
|
disablePDCfg[cfgKey] = cfgValFunc(len(stores), value)
|
|
originPDCfg[cfgKey] = value
|
|
}
|
|
originCfg.ScheduleCfg = originPDCfg
|
|
removedCfg.ScheduleCfg = disablePDCfg
|
|
|
|
log.Debug("saved PD config", zap.Any("config", scheduleCfg))
|
|
|
|
// Remove default PD scheduler that may affect restore process.
|
|
existSchedulers, err := p.ListSchedulers(ctx)
|
|
if err != nil {
|
|
return originCfg, removedCfg, errors.Trace(err)
|
|
}
|
|
needRemoveSchedulers := make([]string, 0, len(existSchedulers))
|
|
for _, s := range existSchedulers {
|
|
if _, ok := Schedulers[s]; ok {
|
|
needRemoveSchedulers = append(needRemoveSchedulers, s)
|
|
}
|
|
}
|
|
|
|
removedSchedulers, err := p.doRemoveSchedulersWith(ctx, needRemoveSchedulers, disablePDCfg)
|
|
if err != nil {
|
|
return originCfg, removedCfg, errors.Trace(err)
|
|
}
|
|
|
|
originCfg.Schedulers = removedSchedulers
|
|
removedCfg.Schedulers = removedSchedulers
|
|
|
|
return originCfg, removedCfg, nil
|
|
}
|
|
|
|
func (p *PdController) GetOriginPDConfig(
|
|
ctx context.Context,
|
|
) (origin ClusterConfig, err error) {
|
|
originCfg := ClusterConfig{}
|
|
scheduleCfg, err := p.GetPDScheduleConfig(ctx)
|
|
if err != nil {
|
|
return originCfg, errors.Trace(err)
|
|
}
|
|
originCfg.ScheduleCfg = scheduleCfg
|
|
|
|
log.Debug("saved PD config", zap.Any("config", scheduleCfg))
|
|
|
|
return originCfg, nil
|
|
}
|
|
|
|
// To resume the schedulers, call the cancel function.
|
|
// wait until done is finished to ensure schedulers all resumed
|
|
func (p *PdController) RemoveSchedulersOnRegion(ctx context.Context, keyRange [][2]kv.Key) (string, func(), error) {
|
|
log.Info("removing scheduler on region")
|
|
schedulerCtx, cancelScheduler := context.WithCancel(ctx)
|
|
done, ruleID, err := pauseSchedulerByKeyRangeWithTTL(schedulerCtx, p.pdHTTPCli, keyRange, pauseTimeout)
|
|
// Wait for the rule to take effect because the PD operator is processed asynchronously.
|
|
// To synchronize this, checking the operator status may not be enough. For details, see
|
|
// https://github.com/pingcap/tidb/issues/49477.
|
|
// Let's use two times default value of `patrol-region-interval` from PD configuration.
|
|
<-time.After(20 * time.Millisecond)
|
|
waitPauseSchedulerDone := func() {
|
|
if done == nil {
|
|
return
|
|
}
|
|
|
|
// Cancel the context - this will cause the goroutine to exit
|
|
cancelScheduler()
|
|
|
|
// Wait for the goroutine to finish cleanup and close the done channel
|
|
<-done
|
|
log.Info("scheduler pause cancelled and cleaned up")
|
|
}
|
|
|
|
return ruleID, waitPauseSchedulerDone, errors.Trace(err)
|
|
}
|
|
|
|
// RemoveSchedulersWithCfg removes pd schedulers and configs with specified ClusterConfig
|
|
func (p *PdController) RemoveSchedulersWithCfg(ctx context.Context, removeCfg ClusterConfig) error {
|
|
_, err := p.doRemoveSchedulersWith(ctx, removeCfg.Schedulers, removeCfg.ScheduleCfg)
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (p *PdController) doRemoveSchedulersWith(
|
|
ctx context.Context,
|
|
needRemoveSchedulers []string,
|
|
disablePDCfg map[string]any,
|
|
) ([]string, error) {
|
|
if !p.isPauseConfigEnabled() {
|
|
return nil, errors.Errorf("pd version %s not support pause config, please upgrade", p.version.String())
|
|
}
|
|
// after 4.0.8 we can set these config with TTL
|
|
s, err := p.pauseSchedulersAndConfigWith(ctx, needRemoveSchedulers, disablePDCfg)
|
|
return s, errors.Trace(err)
|
|
}
|
|
|
|
// GetMinResolvedTS get min-resolved-ts from pd
|
|
func (p *PdController) GetMinResolvedTS(ctx context.Context) (uint64, error) {
|
|
ts, _, err := p.pdHTTPCli.GetMinResolvedTSByStoresIDs(ctx, nil)
|
|
return ts, errors.Trace(err)
|
|
}
|
|
|
|
// RecoverBaseAllocID recover base alloc id
|
|
func (p *PdController) RecoverBaseAllocID(ctx context.Context, id uint64) error {
|
|
return errors.Trace(p.pdHTTPCli.ResetBaseAllocID(ctx, id))
|
|
}
|
|
|
|
// ResetTS reset current ts of pd
|
|
func (p *PdController) ResetTS(ctx context.Context, ts uint64) error {
|
|
// reset-ts of PD will never set ts < current pd ts
|
|
// we set force-use-larger=true to allow ts > current pd ts + 24h(on default)
|
|
err := p.pdHTTPCli.ResetTS(ctx, ts, true)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if strings.Contains(err.Error(), http.StatusText(http.StatusForbidden)) {
|
|
log.Info("reset-ts returns with status forbidden, ignore")
|
|
return nil
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// MarkRecovering mark pd into recovering
|
|
func (p *PdController) MarkRecovering(ctx context.Context) error {
|
|
return errors.Trace(p.pdHTTPCli.SetSnapshotRecoveringMark(ctx))
|
|
}
|
|
|
|
// UnmarkRecovering unmark pd recovering
|
|
func (p *PdController) UnmarkRecovering(ctx context.Context) error {
|
|
return errors.Trace(p.pdHTTPCli.DeleteSnapshotRecoveringMark(ctx))
|
|
}
|
|
|
|
// RegionLabel is the label of a region. This struct is partially copied from
|
|
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L31.
|
|
type RegionLabel struct {
|
|
Key string `json:"key"`
|
|
Value string `json:"value"`
|
|
TTL string `json:"ttl,omitempty"`
|
|
StartAt string `json:"start_at,omitempty"`
|
|
}
|
|
|
|
// LabelRule is the rule to assign labels to a region. This struct is partially copied from
|
|
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L41.
|
|
type LabelRule struct {
|
|
ID string `json:"id"`
|
|
Labels []RegionLabel `json:"labels"`
|
|
RuleType string `json:"rule_type"`
|
|
Data any `json:"data"`
|
|
}
|
|
|
|
// KeyRangeRule contains the start key and end key of the LabelRule. This struct is partially copied from
|
|
// https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L62.
|
|
type KeyRangeRule struct {
|
|
StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal
|
|
EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal
|
|
}
|
|
|
|
// PauseSchedulersByKeyRange will pause schedulers for regions in the specific key range.
|
|
// This function will spawn a goroutine to keep pausing schedulers periodically until the context is done.
|
|
// The return done channel is used to notify the caller that the background goroutine is exited.
|
|
func PauseSchedulersByKeyRange(
|
|
ctx context.Context,
|
|
pdHTTPCli pdhttp.Client,
|
|
startKey, endKey []byte,
|
|
) (done <-chan struct{}, err error) {
|
|
done, _, err = pauseSchedulerByKeyRangeWithTTL(ctx, pdHTTPCli, [][2]kv.Key{{startKey, endKey}}, pauseTimeout)
|
|
// Wait for the rule to take effect because the PD operator is processed asynchronously.
|
|
// To synchronize this, checking the operator status may not be enough. For details, see
|
|
// https://github.com/pingcap/tidb/issues/49477.
|
|
// Let's use two times default value of `patrol-region-interval` from PD configuration.
|
|
<-time.After(20 * time.Millisecond)
|
|
return done, errors.Trace(err)
|
|
}
|
|
|
|
func pauseSchedulerByKeyRangeWithTTL(
|
|
ctx context.Context,
|
|
pdHTTPCli pdhttp.Client,
|
|
keyRange [][2]kv.Key,
|
|
ttl time.Duration,
|
|
) (<-chan struct{}, string, error) {
|
|
var encodedKeyRangeRule = []KeyRangeRule{}
|
|
// no table to restore, return empty rule
|
|
if len(keyRange) == 0 || keyRange[0][0] == nil {
|
|
return nil, "", nil
|
|
}
|
|
for _, keyPair := range keyRange {
|
|
var rule KeyRangeRule
|
|
rule.StartKeyHex = hex.EncodeToString(keyPair[0])
|
|
rule.EndKeyHex = hex.EncodeToString(keyPair[1])
|
|
encodedKeyRangeRule = append(encodedKeyRangeRule, rule)
|
|
}
|
|
rule := &pdhttp.LabelRule{
|
|
ID: uuid.New().String(),
|
|
Labels: []pdhttp.RegionLabel{{
|
|
Key: "schedule",
|
|
Value: "deny",
|
|
TTL: ttl.String(),
|
|
}},
|
|
RuleType: "key-range",
|
|
// Data should be a list of KeyRangeRule when rule type is key-range.
|
|
// See https://github.com/tikv/pd/blob/783d060861cef37c38cbdcab9777fe95c17907fe/server/schedule/labeler/rules.go#L169.
|
|
Data: encodedKeyRangeRule,
|
|
}
|
|
done := make(chan struct{})
|
|
|
|
if err := pdHTTPCli.SetRegionLabelRule(ctx, rule); err != nil {
|
|
close(done)
|
|
return nil, "", errors.Trace(err)
|
|
}
|
|
|
|
go func() {
|
|
defer close(done)
|
|
ticker := time.NewTicker(ttl / 3)
|
|
defer ticker.Stop()
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := pdHTTPCli.SetRegionLabelRule(ctx, rule); err != nil {
|
|
if berrors.IsContextCanceled(err) {
|
|
break loop
|
|
}
|
|
log.Warn("pause scheduler by key range failed, ignore it and wait next time pause",
|
|
zap.Error(err))
|
|
}
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
}
|
|
// Use a new context to avoid the context is canceled by the caller.
|
|
recoverCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
// Set ttl to 0 to remove the rule.
|
|
rule.Labels[0].TTL = time.Duration(0).String()
|
|
deleteRule := &pdhttp.LabelRulePatch{DeleteRules: []string{rule.ID}}
|
|
if err := pdHTTPCli.PatchRegionLabelRules(recoverCtx, deleteRule); err != nil {
|
|
log.Warn("failed to delete region label rule, the rule will be removed after ttl expires",
|
|
zap.String("rule-id", rule.ID), zap.Duration("ttl", ttl), zap.Error(err))
|
|
}
|
|
}()
|
|
return done, rule.ID, nil
|
|
}
|
|
|
|
// CanPauseSchedulerByKeyRange returns whether the scheduler can be paused by key range.
|
|
func (p *PdController) CanPauseSchedulerByKeyRange() bool {
|
|
// We need ttl feature to ensure scheduler can recover from pause automatically.
|
|
return p.version.Compare(minVersionForRegionLabelTTL) >= 0
|
|
}
|
|
|
|
// Close closes the connection to pd.
|
|
func (p *PdController) Close() {
|
|
p.pdClient.Close()
|
|
if p.pdHTTPCli != nil {
|
|
// nil in some unit tests
|
|
p.pdHTTPCli.Close()
|
|
}
|
|
if p.schedulerPauseCh != nil {
|
|
close(p.schedulerPauseCh)
|
|
}
|
|
}
|
|
|
|
func (p *PdController) ttlOfPausing() time.Duration {
|
|
if p.SchedulerPauseTTL > 0 {
|
|
return p.SchedulerPauseTTL
|
|
}
|
|
return pauseTimeout
|
|
}
|
|
|
|
// FetchPDVersion get pd version
|
|
func FetchPDVersion(ctx context.Context, pdHTTPCli pdhttp.Client) (*semver.Version, error) {
|
|
ver, err := pdHTTPCli.GetPDVersion(ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
return parseVersion(ver), nil
|
|
}
|
|
|
|
// SetFollowerHandle set the follower handle option of pd client.
|
|
func (p *PdController) SetFollowerHandle(val bool) error {
|
|
err := p.pdClient.UpdateOption(opt.EnableFollowerHandle, val)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
log.Info("set follower handle", zap.Bool("enable", val))
|
|
return nil
|
|
}
|