Files
tidb/br/pkg/task/operator/config.go

290 lines
8.0 KiB
Go

// Copyright 2023 PingCAP, Inc. Licensed under Apache-2.0.
package operator
import (
"regexp"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/backup"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/spf13/pflag"
)
const (
flagTableConcurrency = "table-concurrency"
flagRestoredTS = "restored-ts"
flagUpstreamClusterID = "upstream-cluster-id"
flagStorePatterns = "stores"
flagTTL = "ttl"
flagSafePoint = "safepoint"
flagStorage = "storage"
flagLoadCreds = "load-creds"
flagJSON = "json"
flagRecent = "recent"
flagTo = "to"
flagBase = "base"
flagYes = "yes"
flagDryRun = "dry-run"
)
type PauseGcConfig struct {
task.Config
SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
// SafePointID is used to identify a specific safepoint.
// This field is only used in ***TEST*** now, you shouldn't use it in the src codes.
SafePointID string `json:"safepoint-id" yaml:"safepoint-id"`
TTL time.Duration `json:"ttl" yaml:"ttl"`
OnAllReady func() `json:"-" yaml:"-"`
OnExit func() `json:"-" yaml:"-"`
}
func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP(flagTTL, "i", 2*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P(flagSafePoint, "t", 0, "The GC safepoint to be kept.")
}
// ParseFromFlags fills the config via the flags.
func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error {
if err := cfg.Config.ParseFromFlags(flags); err != nil {
return err
}
var err error
cfg.SafePoint, err = flags.GetUint64(flagSafePoint)
if err != nil {
return err
}
cfg.TTL, err = flags.GetDuration(flagTTL)
if err != nil {
return err
}
return nil
}
type Base64ifyConfig struct {
objstore.BackendOptions
StorageURI string
LoadCerd bool
}
func DefineFlagsForBase64ifyConfig(flags *pflag.FlagSet) {
objstore.DefineFlags(flags)
flags.StringP(flagStorage, "s", "", "The external storage input.")
flags.Bool(flagLoadCreds, false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]")
}
func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString(flagStorage)
if err != nil {
return err
}
cfg.LoadCerd, err = flags.GetBool(flagLoadCreds)
if err != nil {
return err
}
return nil
}
type ListMigrationConfig struct {
objstore.BackendOptions
StorageURI string
JSONOutput bool
}
func DefineFlagsForListMigrationConfig(flags *pflag.FlagSet) {
objstore.DefineFlags(flags)
flags.StringP(flagStorage, "s", "", "the external storage input.")
flags.Bool(flagJSON, false, "output the result in json format.")
}
func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString(flagStorage)
if err != nil {
return err
}
cfg.JSONOutput, err = flags.GetBool(flagJSON)
if err != nil {
return err
}
return nil
}
type MigrateToConfig struct {
objstore.BackendOptions
StorageURI string
Recent bool
MigrateTo int
Base bool
Yes bool
DryRun bool
}
func DefineFlagsForMigrateToConfig(flags *pflag.FlagSet) {
objstore.DefineFlags(flags)
flags.StringP(flagStorage, "s", "", "the external storage input.")
flags.Bool(flagRecent, true, "migrate to the most recent migration and BASE.")
flags.Int(flagTo, 0, "migrate all migrations from the BASE to the specified sequence number.")
flags.Bool(flagBase, false, "don't merge any migrations, just retry run pending operations in BASE.")
flags.BoolP(flagYes, "y", false, "skip all effect estimating and confirming. execute directly.")
flags.Bool(flagDryRun, false, "do not actually perform the migration, just print the effect.")
}
func (cfg *MigrateToConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString(flagStorage)
if err != nil {
return err
}
cfg.Recent, err = flags.GetBool(flagRecent)
if err != nil {
return err
}
cfg.MigrateTo, err = flags.GetInt(flagTo)
if err != nil {
return err
}
cfg.Base, err = flags.GetBool(flagBase)
if err != nil {
return err
}
cfg.Yes, err = flags.GetBool(flagYes)
if err != nil {
return err
}
cfg.DryRun, err = flags.GetBool(flagDryRun)
if err != nil {
return err
}
return nil
}
func (cfg *MigrateToConfig) Verify() error {
if cfg.Recent && cfg.MigrateTo != 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and --%s flag cannot be used at the same time",
flagRecent, flagTo)
}
if cfg.Base && (cfg.Recent || cfg.MigrateTo != 0) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and ( --%s or --%s ) flag cannot be used at the same time",
flagBase, flagTo, flagRecent)
}
return nil
}
type ForceFlushConfig struct {
task.Config
// StoresPattern matches the address of TiKV.
// The address usually looks like "<host>:20160".
// You may list the store by `pd-ctl stores`.
StoresPattern *regexp.Regexp
}
func DefineFlagsForForceFlushConfig(f *pflag.FlagSet) {
f.String(flagStorePatterns, ".*", "The regexp to match the store peer address to be force flushed.")
}
func (cfg *ForceFlushConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
storePat, err := flags.GetString(flagStorePatterns)
if err != nil {
return err
}
cfg.StoresPattern, err = regexp.Compile(storePat)
if err != nil {
return errors.Annotatef(err, "invalid expression in --%s", flagStorePatterns)
}
return cfg.Config.ParseFromFlags(flags)
}
type ChecksumWithRewriteRulesConfig struct {
task.Config
}
func DefineFlagsForChecksumTableConfig(f *pflag.FlagSet) {
f.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+
"including tableInfo/checksum and stats.")
f.Uint64(flagRestoredTS, 0, "The point time to checksum")
f.Uint64(flagUpstreamClusterID, 0, "")
}
func DefineFlagsForChecksumUpstreamTableConfig(f *pflag.FlagSet) {
f.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+
"including tableInfo/checksum and stats.")
f.Uint64(flagRestoredTS, 0, "The point time to checksum")
}
func DefineFlagsForChecksumPitrTableConfig(f *pflag.FlagSet) {
f.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+
"including tableInfo/checksum and stats.")
f.Uint64(flagRestoredTS, 0, "The point time to checksum")
f.Uint64(flagUpstreamClusterID, 0, "The upstream cluster id of used pitr id map")
}
func (cfg *ChecksumWithRewriteRulesConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency)
if err != nil {
return
}
return cfg.Config.ParseFromFlags(flags)
}
type ChecksumWithPitrIdMapConfig struct {
task.RestoreConfig
}
func (cfg *ChecksumWithPitrIdMapConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency)
if err != nil {
return errors.Trace(err)
}
cfg.RestoreTS, err = flags.GetUint64(flagRestoredTS)
if err != nil {
return errors.Trace(err)
}
cfg.UpstreamClusterID, err = flags.GetUint64(flagUpstreamClusterID)
if err != nil {
return errors.Trace(err)
}
return cfg.Config.ParseFromFlags(flags)
}
type ChecksumUpstreamConfig struct {
task.RestoreConfig
}
func (cfg *ChecksumUpstreamConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) {
cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency)
if err != nil {
return errors.Trace(err)
}
cfg.RestoreTS, err = flags.GetUint64(flagRestoredTS)
if err != nil {
return errors.Trace(err)
}
return cfg.Config.ParseFromFlags(flags)
}