Files
tidb/br/pkg/task/restore.go

2798 lines
101 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package task
import (
"bytes"
"cmp"
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"os"
"slices"
"strings"
"sync/atomic"
"time"
"github.com/docker/go-units"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/checkpoint"
pconfig "github.com/pingcap/tidb/br/pkg/config"
"github.com/pingcap/tidb/br/pkg/conn"
connutil "github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/registry"
"github.com/pingcap/tidb/br/pkg/restore"
snapclient "github.com/pingcap/tidb/br/pkg/restore/snap_client"
"github.com/pingcap/tidb/br/pkg/restore/tiflashrec"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
flagOnline = "online"
flagNoSchema = "no-schema"
flagLoadStats = "load-stats"
flagFastLoadSysTables = "fast-load-sys-tables"
flagGranularity = "granularity"
flagConcurrencyPerStore = "tikv-max-restore-concurrency"
flagAllowPITRFromIncremental = "allow-pitr-from-incremental"
// FlagMergeRegionSizeBytes is the flag name of merge small regions by size
FlagMergeRegionSizeBytes = "merge-region-size-bytes"
// FlagMergeRegionKeyCount is the flag name of merge small regions by key count
FlagMergeRegionKeyCount = "merge-region-key-count"
// FlagPDConcurrency controls concurrency pd-relative operations like split & scatter.
FlagPDConcurrency = "pd-concurrency"
// FlagStatsConcurrency controls concurrency to restore statistic.
FlagStatsConcurrency = "stats-concurrency"
// FlagBatchFlushInterval controls after how long the restore batch would be auto sended.
FlagBatchFlushInterval = "batch-flush-interval"
// FlagDdlBatchSize controls batch ddl size to create a batch of tables
FlagDdlBatchSize = "ddl-batch-size"
// FlagWithPlacementPolicy corresponds to tidb config with-tidb-placement-mode
// current only support STRICT or IGNORE, the default is STRICT according to tidb.
FlagWithPlacementPolicy = "with-tidb-placement-mode"
// FlagKeyspaceName corresponds to tidb config keyspace-name
FlagKeyspaceName = "keyspace-name"
// flagCheckpointStorage use
flagCheckpointStorage = "checkpoint-storage"
// FlagWaitTiFlashReady represents whether wait tiflash replica ready after table restored and checksumed.
FlagWaitTiFlashReady = "wait-tiflash-ready"
// FlagStreamStartTS and FlagStreamRestoreTS is used for log restore timestamp range.
FlagStreamStartTS = "start-ts"
FlagStreamRestoreTS = "restored-ts"
// FlagStreamFullBackupStorage is used for log restore, represents the full backup storage.
FlagStreamFullBackupStorage = "full-backup-storage"
// FlagPiTRBatchCount and FlagPiTRBatchSize are used for restore log with batch method.
FlagPiTRBatchCount = "pitr-batch-count"
FlagPiTRBatchSize = "pitr-batch-size"
FlagPiTRConcurrency = "pitr-concurrency"
FlagResetSysUsers = "reset-sys-users"
FlagSysCheckCollation = "sys-check-collation"
defaultPiTRBatchCount = 8
defaultPiTRBatchSize = 16 * 1024 * 1024
defaultRestoreConcurrency = 128
defaultPiTRConcurrency = 16
defaultPDConcurrency = 1
defaultStatsConcurrency = 12
defaultBatchFlushInterval = 16 * time.Second
defaultFlagDdlBatchSize = 128
)
const (
FullRestoreCmd = "Full Restore"
DBRestoreCmd = "DataBase Restore"
TableRestoreCmd = "Table Restore"
PointRestoreCmd = "Point Restore"
RawRestoreCmd = "Raw Restore"
TxnRestoreCmd = "Txn Restore"
)
// RestoreCommonConfig is the common configuration for all BR restore tasks.
type RestoreCommonConfig struct {
Online bool `json:"online" toml:"online"`
Granularity string `json:"granularity" toml:"granularity"`
ConcurrencyPerStore pconfig.ConfigTerm[uint] `json:"tikv-max-restore-concurrency" toml:"tikv-max-restore-concurrency"`
// MergeSmallRegionSizeBytes is the threshold of merging small regions (Default 96MB, region split size).
// MergeSmallRegionKeyCount is the threshold of merging smalle regions (Default 960_000, region split key count).
// See https://github.com/tikv/tikv/blob/v4.0.8/components/raftstore/src/coprocessor/config.rs#L35-L38
MergeSmallRegionSizeBytes pconfig.ConfigTerm[uint64] `json:"merge-region-size-bytes" toml:"merge-region-size-bytes"`
MergeSmallRegionKeyCount pconfig.ConfigTerm[uint64] `json:"merge-region-key-count" toml:"merge-region-key-count"`
// determines whether enable restore sys table on default, see fullClusterRestore in restore/client.go
WithSysTable bool `json:"with-sys-table" toml:"with-sys-table"`
ResetSysUsers []string `json:"reset-sys-users" toml:"reset-sys-users"`
SysCheckCollation bool `json:"sys-check-collation" toml:"sys-check-collation"`
}
// adjust adjusts the abnormal config value in the current config.
// useful when not starting BR from CLI (e.g. from BRIE in SQL).
func (cfg *RestoreCommonConfig) adjust() {
if !cfg.MergeSmallRegionKeyCount.Modified {
cfg.MergeSmallRegionKeyCount.Value = conn.DefaultMergeRegionKeyCount
}
if !cfg.MergeSmallRegionSizeBytes.Modified {
cfg.MergeSmallRegionSizeBytes.Value = conn.DefaultMergeRegionSizeBytes
}
if len(cfg.Granularity) == 0 {
cfg.Granularity = string(restore.CoarseGrained)
}
if !cfg.ConcurrencyPerStore.Modified {
cfg.ConcurrencyPerStore.Value = conn.DefaultImportNumGoroutines
}
}
// DefineRestoreCommonFlags defines common flags for the restore command.
func DefineRestoreCommonFlags(flags *pflag.FlagSet) {
// TODO remove experimental tag if it's stable
flags.Bool(flagOnline, false, "(experimental) Whether online when restore")
flags.String(flagGranularity, string(restore.CoarseGrained), "(deprecated) Whether split & scatter regions using fine-grained way during restore")
flags.Uint(flagConcurrencyPerStore, 128, "The size of thread pool on each store that executes tasks")
flags.Uint32(flagConcurrency, 128, "(deprecated) The size of thread pool on BR that executes tasks, "+
"where each task restores one SST file to TiKV")
flags.Uint64(FlagMergeRegionSizeBytes, conn.DefaultMergeRegionSizeBytes,
"the threshold of merging small regions (Default 96MB, region split size)")
flags.Uint64(FlagMergeRegionKeyCount, conn.DefaultMergeRegionKeyCount,
"the threshold of merging small regions (Default 960_000, region split key count)")
flags.Uint(FlagPDConcurrency, defaultPDConcurrency,
"(deprecated) concurrency pd-relative operations like split & scatter.")
flags.Uint(FlagStatsConcurrency, defaultStatsConcurrency,
"concurrency to restore statistic")
flags.Duration(FlagBatchFlushInterval, defaultBatchFlushInterval,
"after how long a restore batch would be auto sent.")
flags.Uint(FlagDdlBatchSize, defaultFlagDdlBatchSize,
"batch size for ddl to create a batch of tables once.")
flags.Bool(flagWithSysTable, true, "whether restore system privilege tables on default setting")
flags.StringArrayP(FlagResetSysUsers, "", []string{"cloud_admin", "root"}, "whether reset these users after restoration")
flags.Bool(flagUseFSR, false, "whether enable FSR for AWS snapshots")
_ = flags.MarkHidden(FlagResetSysUsers)
_ = flags.MarkHidden(FlagMergeRegionSizeBytes)
_ = flags.MarkHidden(FlagMergeRegionKeyCount)
_ = flags.MarkHidden(FlagPDConcurrency)
_ = flags.MarkHidden(FlagStatsConcurrency)
_ = flags.MarkHidden(FlagBatchFlushInterval)
_ = flags.MarkHidden(FlagDdlBatchSize)
_ = flags.MarkHidden(flagUseFSR)
}
// ParseFromFlags parses the config from the flag set.
func (cfg *RestoreCommonConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
cfg.Online, err = flags.GetBool(flagOnline)
if err != nil {
return errors.Trace(err)
}
cfg.Granularity, err = flags.GetString(flagGranularity)
if err != nil {
return errors.Trace(err)
}
cfg.ConcurrencyPerStore.Value, err = flags.GetUint(flagConcurrencyPerStore)
if err != nil {
return errors.Trace(err)
}
cfg.ConcurrencyPerStore.Modified = flags.Changed(flagConcurrencyPerStore)
cfg.MergeSmallRegionKeyCount.Value, err = flags.GetUint64(FlagMergeRegionKeyCount)
if err != nil {
return errors.Trace(err)
}
cfg.MergeSmallRegionKeyCount.Modified = flags.Changed(FlagMergeRegionKeyCount)
cfg.MergeSmallRegionSizeBytes.Value, err = flags.GetUint64(FlagMergeRegionSizeBytes)
if err != nil {
return errors.Trace(err)
}
cfg.MergeSmallRegionSizeBytes.Modified = flags.Changed(FlagMergeRegionSizeBytes)
if flags.Lookup(flagWithSysTable) != nil {
cfg.WithSysTable, err = flags.GetBool(flagWithSysTable)
if err != nil {
return errors.Trace(err)
}
}
cfg.ResetSysUsers, err = flags.GetStringArray(FlagResetSysUsers)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(err)
}
// RestoreConfig is the configuration specific for restore tasks.
type RestoreConfig struct {
Config
RestoreCommonConfig
NoSchema bool `json:"no-schema" toml:"no-schema"`
LoadStats bool `json:"load-stats" toml:"load-stats"`
FastLoadSysTables bool `json:"fast-load-sys-tables" toml:"fast-load-sys-tables"`
PDConcurrency uint `json:"pd-concurrency" toml:"pd-concurrency"`
StatsConcurrency uint `json:"stats-concurrency" toml:"stats-concurrency"`
BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"`
// DdlBatchSize use to define the size of batch ddl to create tables
DdlBatchSize uint `json:"ddl-batch-size" toml:"ddl-batch-size"`
WithPlacementPolicy string `json:"with-tidb-placement-mode" toml:"with-tidb-placement-mode"`
// FullBackupStorage is used to run `restore full` before `restore log`.
// if it is empty, directly take restoring log justly.
FullBackupStorage string `json:"full-backup-storage" toml:"full-backup-storage"`
// AllowPITRFromIncremental indicates whether this restore should enter a compatibility mode for incremental restore.
// In this restore mode, the restore will not perform timestamp rewrite on the incremental data.
AllowPITRFromIncremental bool `json:"allow-pitr-from-incremental" toml:"allow-pitr-from-incremental"`
// [startTs, RestoreTS] is used to `restore log` from StartTS to RestoreTS.
StartTS uint64 `json:"start-ts" toml:"start-ts"`
// if not specified system will restore to the max TS available
RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"`
// whether RestoreTS was explicitly specified by user vs auto-detected
IsRestoredTSUserSpecified bool `json:"-" toml:"-"`
// rewriteTS is the rewritten timestamp of meta kvs.
RewriteTS uint64 `json:"-" toml:"-"`
tiflashRecorder *tiflashrec.TiFlashRecorder `json:"-" toml:"-"`
PitrBatchCount uint32 `json:"pitr-batch-count" toml:"pitr-batch-count"`
PitrBatchSize uint32 `json:"pitr-batch-size" toml:"pitr-batch-size"`
PitrConcurrency uint32 `json:"-" toml:"-"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
CheckpointStorage string `json:"checkpoint-storage" toml:"checkpoint-storage"`
UpstreamClusterID uint64 `json:"-" toml:"-"`
snapshotCheckpointMetaManager checkpoint.SnapshotMetaManagerT `json:"-" toml:"-"`
logCheckpointMetaManager checkpoint.LogMetaManagerT `json:"-" toml:"-"`
sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT `json:"-" toml:"-"`
RestoreID uint64 `json:"-" toml:"-"`
RestoreRegistry *registry.Registry `json:"-" toml:"-"`
WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"`
// PITR-related fields for blocklist creation
// RestoreStartTS is the timestamp when the restore operation began (before any table creation).
// This is used for blocklist files to accurately mark when tables were created.
RestoreStartTS uint64 `json:"-" toml:"-"`
tableMappingManager *stream.TableMappingManager `json:"-" toml:"-"`
// for ebs-based restore
FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"`
Prepare bool `json:"prepare" toml:"prepare"`
OutputFile string `json:"output-file" toml:"output-file"`
SkipAWS bool `json:"skip-aws" toml:"skip-aws"`
CloudAPIConcurrency uint `json:"cloud-api-concurrency" toml:"cloud-api-concurrency"`
VolumeType pconfig.EBSVolumeType `json:"volume-type" toml:"volume-type"`
VolumeIOPS int64 `json:"volume-iops" toml:"volume-iops"`
VolumeThroughput int64 `json:"volume-throughput" toml:"volume-throughput"`
VolumeEncrypted bool `json:"volume-encrypted" toml:"volume-encrypted"`
ProgressFile string `json:"progress-file" toml:"progress-file"`
TargetAZ string `json:"target-az" toml:"target-az"`
UseFSR bool `json:"use-fsr" toml:"use-fsr"`
}
func (cfg *RestoreConfig) LocalEncryptionEnabled() bool {
return cfg.CipherInfo.CipherType != encryptionpb.EncryptionMethod_PLAINTEXT
}
type immutableRestoreConfig struct {
CmdName string
UpstreamClusterID uint64
Storage string
ExplictFilter bool
FilterStr []string
WithSysTable bool
FastLoadSysTables bool
LoadStats bool
}
func (cfg *RestoreConfig) Hash(cmdName string) ([]byte, error) {
config := immutableRestoreConfig{
CmdName: cmdName,
UpstreamClusterID: cfg.UpstreamClusterID,
Storage: ast.RedactURL(cfg.Storage),
FilterStr: cfg.FilterStr,
WithSysTable: cfg.WithSysTable,
FastLoadSysTables: cfg.FastLoadSysTables,
LoadStats: cfg.LoadStats,
}
data, err := json.Marshal(config)
if err != nil {
return nil, errors.Trace(err)
}
hash := sha256.Sum256(data)
log.Info("hash of restore config", zap.String("hash", hex.EncodeToString(hash[:])), zap.String("config", string(data)))
return hash[:], nil
}
// DefineRestoreFlags defines common flags for the restore tidb command.
func DefineRestoreFlags(flags *pflag.FlagSet) {
flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones")
flags.Bool(flagLoadStats, true, "Run load stats or update stats_meta to trigger auto-analyze at end of snapshot restore task")
flags.Bool(flagFastLoadSysTables, true, "load system tables (including statistics) by renaming the temporary system tables")
// Do not expose this flag
_ = flags.MarkHidden(flagNoSchema)
flags.String(FlagWithPlacementPolicy, "STRICT", "correspond to tidb global/session variable with-tidb-placement-mode")
flags.String(FlagKeyspaceName, "", "correspond to tidb config keyspace-name")
flags.Bool(flagUseCheckpoint, true, "use checkpoint mode")
_ = flags.MarkHidden(flagUseCheckpoint)
flags.String(flagCheckpointStorage, "", "specify the external storage url where checkpoint data is saved, eg, s3://bucket/path/prefix")
flags.Bool(FlagWaitTiFlashReady, false, "whether wait tiflash replica ready if tiflash exists")
flags.Bool(flagAllowPITRFromIncremental, true, "whether make incremental restore compatible with later log restore"+
" default is true, the incremental restore will not perform rewrite on the incremental data"+
" meanwhile the incremental restore will not allow to restore 3 backfilled type ddl jobs,"+
" these ddl jobs are Add index, Modify column and Reorganize partition")
flags.Bool(FlagSysCheckCollation, false, "whether check the privileges table rows to permit to restore the privilege data"+
" from utf8mb4_bin collate column to utf8mb4_general_ci collate column")
DefineRestoreCommonFlags(flags)
}
// DefineStreamRestoreFlags defines for the restore log command.
func DefineStreamRestoreFlags(command *cobra.Command) {
command.Flags().String(FlagStreamStartTS, "", "the start timestamp which log restore from.\n"+
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'")
command.Flags().String(FlagStreamRestoreTS, "", "the point of restore, used for log restore.\n"+
"support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'")
command.Flags().String(FlagStreamFullBackupStorage, "", "specify the backup full storage. "+
"fill it if want restore full backup before restore log.")
command.Flags().Uint32(FlagPiTRBatchCount, defaultPiTRBatchCount, "specify the batch count to restore log.")
command.Flags().Uint32(FlagPiTRBatchSize, defaultPiTRBatchSize, "specify the batch size to retore log.")
command.Flags().Uint32(FlagPiTRConcurrency, defaultPiTRConcurrency, "specify the concurrency to restore log.")
}
// ParseStreamRestoreFlags parses the `restore stream` flags from the flag set.
func (cfg *RestoreConfig) ParseStreamRestoreFlags(flags *pflag.FlagSet) error {
tsString, err := flags.GetString(FlagStreamStartTS)
if err != nil {
return errors.Trace(err)
}
if cfg.StartTS, err = ParseTSString(tsString, true); err != nil {
return errors.Trace(err)
}
tsString, err = flags.GetString(FlagStreamRestoreTS)
if err != nil {
return errors.Trace(err)
}
if cfg.RestoreTS, err = ParseTSString(tsString, true); err != nil {
return errors.Trace(err)
}
// check if RestoreTS was explicitly specified by user
cfg.IsRestoredTSUserSpecified = flags.Changed(FlagStreamRestoreTS)
if cfg.FullBackupStorage, err = flags.GetString(FlagStreamFullBackupStorage); err != nil {
return errors.Trace(err)
}
if cfg.StartTS > 0 && len(cfg.FullBackupStorage) > 0 {
return errors.Annotatef(berrors.ErrInvalidArgument, "%v and %v are mutually exclusive",
FlagStreamStartTS, FlagStreamFullBackupStorage)
}
if cfg.PitrBatchCount, err = flags.GetUint32(FlagPiTRBatchCount); err != nil {
return errors.Trace(err)
}
if cfg.PitrBatchSize, err = flags.GetUint32(FlagPiTRBatchSize); err != nil {
return errors.Trace(err)
}
if cfg.PitrConcurrency, err = flags.GetUint32(FlagPiTRConcurrency); err != nil {
return errors.Trace(err)
}
return nil
}
// ParseFromFlags parses the restore-related flags from the flag set.
func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
var err error
cfg.NoSchema, err = flags.GetBool(flagNoSchema)
if err != nil {
return errors.Trace(err)
}
cfg.LoadStats, err = flags.GetBool(flagLoadStats)
if err != nil {
return errors.Trace(err)
}
cfg.FastLoadSysTables, err = flags.GetBool(flagFastLoadSysTables)
if err != nil {
return errors.Trace(err)
}
// parse common config if needed
if !skipCommonConfig {
err = cfg.Config.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
}
err = cfg.RestoreCommonConfig.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
cfg.Concurrency, err = flags.GetUint32(flagConcurrency)
if err != nil {
return errors.Trace(err)
}
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
cfg.PDConcurrency, err = flags.GetUint(FlagPDConcurrency)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagPDConcurrency)
}
cfg.StatsConcurrency, err = flags.GetUint(FlagStatsConcurrency)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagStatsConcurrency)
}
cfg.BatchFlushInterval, err = flags.GetDuration(FlagBatchFlushInterval)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagBatchFlushInterval)
}
cfg.DdlBatchSize, err = flags.GetUint(FlagDdlBatchSize)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagDdlBatchSize)
}
cfg.WithPlacementPolicy, err = flags.GetString(FlagWithPlacementPolicy)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagWithPlacementPolicy)
}
cfg.KeyspaceName, err = flags.GetString(FlagKeyspaceName)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagKeyspaceName)
}
cfg.UseCheckpoint, err = flags.GetBool(flagUseCheckpoint)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", flagUseCheckpoint)
}
cfg.CheckpointStorage, err = flags.GetString(flagCheckpointStorage)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", flagCheckpointStorage)
}
cfg.WaitTiflashReady, err = flags.GetBool(FlagWaitTiFlashReady)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagWaitTiFlashReady)
}
cfg.SysCheckCollation, err = flags.GetBool(FlagSysCheckCollation)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", FlagSysCheckCollation)
}
cfg.AllowPITRFromIncremental, err = flags.GetBool(flagAllowPITRFromIncremental)
if err != nil {
return errors.Annotatef(err, "failed to get flag %s", flagAllowPITRFromIncremental)
}
if flags.Lookup(flagFullBackupType) != nil {
// for restore full only
fullBackupType, err := flags.GetString(flagFullBackupType)
if err != nil {
return errors.Trace(err)
}
if !FullBackupType(fullBackupType).Valid() {
return errors.New("invalid full backup type")
}
cfg.FullBackupType = FullBackupType(fullBackupType)
cfg.Prepare, err = flags.GetBool(flagPrepare)
if err != nil {
return errors.Trace(err)
}
cfg.SkipAWS, err = flags.GetBool(flagSkipAWS)
if err != nil {
return errors.Trace(err)
}
cfg.CloudAPIConcurrency, err = flags.GetUint(flagCloudAPIConcurrency)
if err != nil {
return errors.Trace(err)
}
cfg.OutputFile, err = flags.GetString(flagOutputMetaFile)
if err != nil {
return errors.Trace(err)
}
volumeType, err := flags.GetString(flagVolumeType)
if err != nil {
return errors.Trace(err)
}
cfg.VolumeType = pconfig.EBSVolumeType(volumeType)
if !cfg.VolumeType.Valid() {
return errors.New("invalid volume type: " + volumeType)
}
if cfg.VolumeIOPS, err = flags.GetInt64(flagVolumeIOPS); err != nil {
return errors.Trace(err)
}
if cfg.VolumeThroughput, err = flags.GetInt64(flagVolumeThroughput); err != nil {
return errors.Trace(err)
}
if cfg.VolumeEncrypted, err = flags.GetBool(flagVolumeEncrypted); err != nil {
return errors.Trace(err)
}
cfg.ProgressFile, err = flags.GetString(flagProgressFile)
if err != nil {
return errors.Trace(err)
}
cfg.TargetAZ, err = flags.GetString(flagTargetAZ)
if err != nil {
return errors.Trace(err)
}
cfg.UseFSR, err = flags.GetBool(flagUseFSR)
if err != nil {
return errors.Trace(err)
}
// iops: gp3 [3,000-16,000]; io1/io2 [100-32,000]
// throughput: gp3 [125, 1000]; io1/io2 cannot set throughput
// io1 and io2 volumes support up to 64,000 IOPS only on Instances built on the Nitro System.
// Other instance families support performance up to 32,000 IOPS.
// https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateVolume.html
// todo: check lower/upper bound
}
return nil
}
// Adjust is use for BR(binary) and BR in TiDB.
// When new config was added and not included in parser.
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *RestoreConfig) Adjust() {
cfg.Config.adjust()
cfg.RestoreCommonConfig.adjust()
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultRestoreConcurrency
}
if cfg.Config.SwitchModeInterval == 0 {
cfg.Config.SwitchModeInterval = defaultSwitchInterval
}
if cfg.PDConcurrency == 0 {
cfg.PDConcurrency = defaultPDConcurrency
}
if cfg.StatsConcurrency == 0 {
cfg.StatsConcurrency = defaultStatsConcurrency
}
if cfg.BatchFlushInterval == 0 {
cfg.BatchFlushInterval = defaultBatchFlushInterval
}
if cfg.DdlBatchSize == 0 {
cfg.DdlBatchSize = defaultFlagDdlBatchSize
}
if cfg.CloudAPIConcurrency == 0 {
cfg.CloudAPIConcurrency = defaultCloudAPIConcurrency
}
}
func (cfg *RestoreConfig) adjustRestoreConfigForStreamRestore() {
if cfg.PitrConcurrency == 0 {
cfg.PitrConcurrency = defaultPiTRConcurrency
}
if cfg.PitrBatchCount == 0 {
cfg.PitrBatchCount = defaultPiTRBatchCount
}
if cfg.PitrBatchSize == 0 {
cfg.PitrBatchSize = defaultPiTRBatchSize
}
// another goroutine is used to iterate the backup file
cfg.PitrConcurrency += 1
log.Info("set restore kv files concurrency", zap.Int("concurrency", int(cfg.PitrConcurrency)))
if cfg.ConcurrencyPerStore.Value > 0 {
log.Info("set restore compacted sst files concurrency per store",
zap.Int("concurrency", int(cfg.ConcurrencyPerStore.Value)))
}
}
func (cfg *RestoreConfig) newStorageCheckpointMetaManagerPITR(
ctx context.Context,
downstreamClusterID uint64,
restoreID uint64,
) error {
log.Info("creating storage checkpoint meta managers for PiTR",
zap.Uint64("restoreID", restoreID),
zap.Uint64("downstreamClusterID", downstreamClusterID),
zap.String("checkpointStorage", cfg.CheckpointStorage),
zap.Bool("hasFullBackupStorage", len(cfg.FullBackupStorage) > 0))
_, checkpointStorage, err := GetStorage(ctx, cfg.CheckpointStorage, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
if len(cfg.FullBackupStorage) > 0 {
cfg.snapshotCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager(
checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "snapshot", restoreID)
log.Info("created snapshot checkpoint meta manager",
zap.Uint64("restoreID", restoreID))
}
cfg.logCheckpointMetaManager = checkpoint.NewLogStorageMetaManager(
checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "log", restoreID)
log.Info("created log checkpoint meta manager",
zap.Uint64("restoreID", restoreID))
cfg.sstCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager(
checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "sst", restoreID)
log.Info("created sst checkpoint meta manager",
zap.Uint64("restoreID", restoreID))
return nil
}
func (cfg *RestoreConfig) newStorageCheckpointMetaManagerSnapshot(
ctx context.Context,
downstreamClusterID uint64,
restoreID uint64,
) error {
if cfg.snapshotCheckpointMetaManager != nil {
return nil
}
_, checkpointStorage, err := GetStorage(ctx, cfg.CheckpointStorage, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
cfg.snapshotCheckpointMetaManager = checkpoint.NewSnapshotStorageMetaManager(
checkpointStorage, &cfg.CipherInfo, downstreamClusterID, "snapshot", restoreID)
return nil
}
func (cfg *RestoreConfig) newTableCheckpointMetaManagerPITR(g glue.Glue, dom *domain.Domain, id uint64) (err error) {
log.Info("creating table checkpoint meta managers for PiTR",
zap.Uint64("restoreID", id),
zap.Bool("hasFullBackupStorage", len(cfg.FullBackupStorage) > 0))
if len(cfg.FullBackupStorage) > 0 {
if cfg.snapshotCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager(
g, dom, checkpoint.SnapshotRestoreCheckpointDatabaseName, id,
); err != nil {
return errors.Trace(err)
}
log.Info("created snapshot table checkpoint meta manager",
zap.Uint64("restoreID", id))
}
if cfg.logCheckpointMetaManager, err = checkpoint.NewLogTableMetaManager(
g, dom, checkpoint.LogRestoreCheckpointDatabaseName, id,
); err != nil {
return errors.Trace(err)
}
log.Info("created log table checkpoint meta manager",
zap.Uint64("restoreID", id))
if cfg.sstCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager(
g, dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName, id,
); err != nil {
return errors.Trace(err)
}
log.Info("created sst table checkpoint meta manager",
zap.Uint64("restoreID", id))
return nil
}
func (cfg *RestoreConfig) newTableCheckpointMetaManagerSnapshot(g glue.Glue, dom *domain.Domain, id uint64) (err error) {
if cfg.snapshotCheckpointMetaManager != nil {
return nil
}
if cfg.snapshotCheckpointMetaManager, err = checkpoint.NewSnapshotTableMetaManager(
g, dom, checkpoint.SnapshotRestoreCheckpointDatabaseName, id,
); err != nil {
return errors.Trace(err)
}
return nil
}
func (cfg *RestoreConfig) CloseCheckpointMetaManager() {
if cfg.logCheckpointMetaManager != nil {
cfg.logCheckpointMetaManager.Close()
}
if cfg.snapshotCheckpointMetaManager != nil {
cfg.snapshotCheckpointMetaManager.Close()
}
if cfg.sstCheckpointMetaManager != nil {
cfg.sstCheckpointMetaManager.Close()
}
}
func configureRestoreClient(ctx context.Context, client *snapclient.SnapClient, cfg *RestoreConfig) error {
client.SetRateLimit(cfg.RateLimit)
client.SetCrypter(&cfg.CipherInfo)
if cfg.NoSchema {
client.EnableSkipCreateSQL()
}
client.SetBatchDdlSize(cfg.DdlBatchSize)
client.SetPlacementPolicyMode(cfg.WithPlacementPolicy)
client.SetWithSysTable(cfg.WithSysTable)
client.SetRewriteMode(ctx)
client.SetCheckPrivilegeTableRowsCollateCompatiblity(cfg.SysCheckCollation)
return nil
}
func CheckNewCollationEnable(
backupNewCollationEnable string,
g glue.Glue,
storage kv.Storage,
CheckRequirements bool,
) (bool, error) {
se, err := g.CreateSession(storage)
if err != nil {
return false, errors.Trace(err)
}
newCollationEnable, err := se.GetGlobalVariable(utils.GetTidbNewCollationEnabled())
if err != nil {
return false, errors.Trace(err)
}
// collate.newCollationEnabled is set to 1 when the collate package is initialized,
// so we need to modify this value according to the config of the cluster
// before using the collate package.
enabled := newCollationEnable == "True"
// modify collate.newCollationEnabled according to the config of the cluster
collate.SetNewCollationEnabledForTest(enabled)
log.Info(fmt.Sprintf("set %s", utils.TidbNewCollationEnabled), zap.Bool("new_collation_enabled", enabled))
if backupNewCollationEnable == "" {
if CheckRequirements {
return enabled, errors.Annotatef(berrors.ErrUnknown,
"the value '%s' not found in backupmeta. "+
"you can use \"SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME='%s';\" to manually check the config. "+
"if you ensure the value '%s' in backup cluster is as same as restore cluster, use --check-requirements=false to skip this check",
utils.TidbNewCollationEnabled, utils.TidbNewCollationEnabled, utils.TidbNewCollationEnabled)
}
log.Warn(fmt.Sprintf("the config '%s' is not in backupmeta", utils.TidbNewCollationEnabled))
return enabled, nil
}
if !strings.EqualFold(backupNewCollationEnable, newCollationEnable) {
return enabled, errors.Annotatef(berrors.ErrUnknown,
"the config '%s' not match, upstream:%v, downstream: %v",
utils.TidbNewCollationEnabled, backupNewCollationEnable, newCollationEnable)
}
return enabled, nil
}
// VerifyDBAndTableInBackup is used to check whether the restore dbs or tables have been backup
func VerifyDBAndTableInBackup(schemas []*metautil.Database, cfg *RestoreConfig) error {
if len(cfg.Schemas) == 0 && len(cfg.Tables) == 0 {
return nil
}
schemasMap := make(map[string]struct{})
tablesMap := make(map[string]struct{})
for _, db := range schemas {
dbName := db.Info.Name.L
if dbCIStrName, ok := utils.GetSysDBCIStrName(db.Info.Name); utils.IsSysDB(dbCIStrName.O) && ok {
dbName = dbCIStrName.L
}
schemasMap[utils.EncloseName(dbName)] = struct{}{}
for _, table := range db.Tables {
if table.Info == nil {
// we may back up empty database.
continue
}
tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{}
}
}
// check on if explicit schema/table filter matches
restoreSchemas := cfg.Schemas
restoreTables := cfg.Tables
for schema := range restoreSchemas {
schemaLName := strings.ToLower(schema)
if _, ok := schemasMap[schemaLName]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[database: %v] has not been backup, please ensure you has input a correct database name", schema)
}
}
for table := range restoreTables {
tableLName := strings.ToLower(table)
if _, ok := tablesMap[tableLName]; !ok {
return errors.Annotatef(berrors.ErrUndefinedRestoreDbOrTable,
"[table: %v] has not been backup, please ensure you has input a correct table name", table)
}
}
return nil
}
func isFullRestore(cmdName string) bool {
return cmdName == FullRestoreCmd
}
// IsStreamRestore checks the command is `restore point`
func IsStreamRestore(cmdName string) bool {
return cmdName == PointRestoreCmd
}
func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF func(context.Context) error, err error) {
register := utils.NewTaskRegister(etcdCLI, utils.RegisterRestore, fmt.Sprintf("restore-%s", uuid.New()))
err = register.RegisterTask(ctx)
return register.Close, errors.Trace(err)
}
func DefaultRestoreConfig(commonConfig Config) RestoreConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineRestoreFlags(fs)
cfg := RestoreConfig{}
err := cfg.ParseFromFlags(fs, true)
if err != nil {
log.Panic("failed to parse restore flags to config", zap.Error(err))
}
cfg.Config = commonConfig
return cfg
}
func printRestoreMetrics() {
log.Info("Metric: import_file_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreImportFileSeconds)))
log.Info("Metric: upload_sst_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTForPiTRSeconds)))
log.Info("Metric: upload_sst_meta_for_pitr_seconds", zap.Object("metric", logutil.MarshalHistogram(metrics.RestoreUploadSSTMetaForPiTRSeconds)))
}
// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) (restoreErr error) {
etcdCLI, err := dialEtcdWithCfg(c, cfg.Config)
if err != nil {
return err
}
defer func() {
if err := etcdCLI.Close(); err != nil {
log.Error("failed to close the etcd client", zap.Error(err))
}
}()
logTaskBackend, err := checkConflictingLogBackup(c, cfg, IsStreamRestore(cmdName), etcdCLI)
if err != nil {
return errors.Annotate(err, "failed to check task exists")
}
closeF, err := registerTaskToPD(c, etcdCLI)
if err != nil {
return errors.Annotate(err, "failed to register task to pd")
}
defer func() {
_ = closeF(c)
}()
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = cfg.KeyspaceName
})
// TODO: remove version checker from `NewMgr`
mgr, err := NewMgr(c, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
defer cfg.CloseCheckpointMetaManager()
defer func() {
if logTaskBackend == nil || restoreErr != nil || cfg.PiTRTableTracker == nil {
return
}
restoreCommitTs, err := restore.GetTSWithRetry(c, mgr.GetPDClient())
if err != nil {
restoreErr = err
return
}
if cfg.tableMappingManager == nil {
log.Error("tableMappingManager is nil, blocklist will contain no IDs")
restoreErr = errors.New("tableMappingManager is nil")
return
}
// Extract downstream IDs from tableMappingManager
// ApplyFilterToDBReplaceMap has already filtered the DBReplaceMap based on PiTRTableTracker,
// so we can directly iterate through it and collect non-filtered IDs
downstreamTableIds := make(map[int64]struct{})
var downstreamDbIds []int64
// Iterate through DBReplaceMap which has already been filtered by ApplyFilterToDBReplaceMap
for _, dbReplace := range cfg.tableMappingManager.DBReplaceMap {
if dbReplace.FilteredOut {
continue
}
// Collect downstream DB ID
downstreamDbIds = append(downstreamDbIds, dbReplace.DbID)
// Iterate through tables in this database
for _, tableReplace := range dbReplace.TableMap {
if tableReplace.FilteredOut {
continue
}
// Collect downstream table ID
downstreamTableIds[tableReplace.TableID] = struct{}{}
// Collect all partition IDs for this table
for _, downstreamPartitionId := range tableReplace.PartitionMap {
downstreamTableIds[downstreamPartitionId] = struct{}{}
}
}
}
restoreStartTs := cfg.RestoreStartTS
if restoreStartTs == 0 {
log.Warn("restoreStartTS is not set, the blocklist will block from 0 to restoreCommitTs")
}
// Convert map to slice for function call
tableIdsSlice := make([]int64, 0, len(downstreamTableIds))
for id := range downstreamTableIds {
tableIdsSlice = append(tableIdsSlice, id)
}
filename, data, err := restore.MarshalLogRestoreTableIDsBlocklistFile(restoreCommitTs, restoreStartTs, cfg.RewriteTS, tableIdsSlice, downstreamDbIds)
if err != nil {
restoreErr = err
return
}
logTaskStorage, err := objstore.Create(c, logTaskBackend, false)
if err != nil {
restoreErr = err
return
}
log.Info("save the log restore table IDs blocklist into log backup storage",
zap.Int("downstreamTableCount", len(downstreamTableIds)),
zap.Int("downstreamDbCount", len(downstreamDbIds)))
if err = logTaskStorage.WriteFile(c, filename, data); err != nil {
restoreErr = err
return
}
}()
if err = g.UseOneShotSession(mgr.GetStorage(), false, func(se glue.Session) error {
enableFollowerHandleRegion, err := se.GetGlobalSysVar(vardef.PDEnableFollowerHandleRegion)
if err != nil {
return err
}
return mgr.SetFollowerHandle(variable.TiDBOptOn(enableFollowerHandleRegion))
}); err != nil {
return errors.Trace(err)
}
defer printRestoreMetrics()
// build restore registry
restoreRegistry, err := registry.NewRestoreRegistry(c, g, mgr.GetDomain())
if err != nil {
return errors.Trace(err)
}
defer restoreRegistry.Close()
cfg.RestoreRegistry = restoreRegistry
if IsStreamRestore(cmdName) {
if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBRPiTR); err != nil {
return errors.Trace(err)
}
restoreErr = RunStreamRestore(c, mgr, g, cfg)
} else {
if err := version.CheckClusterVersion(c, mgr.GetPDClient(), version.CheckVersionForBR); err != nil {
return errors.Trace(err)
}
snapshotRestoreConfig := SnapshotRestoreConfig{
RestoreConfig: cfg,
}
restoreErr = runSnapshotRestore(c, mgr, g, cmdName, &snapshotRestoreConfig)
}
// unregister restore task
failpoint.Inject("fail-at-end-of-restore", func() {
log.Info("failpoint fail-at-end-of-restore injected, failing at the end of restore task",
zap.Error(restoreErr))
restoreErr = errors.New("failpoint: fail-at-end-of-restore")
})
if restoreErr != nil {
// if err happens at register phase no restoreID will be generated and default is 0, or this is an old TiDB
// that we didn't register the task. Either way we don't need to do any extra work.
if cfg.RestoreID == 0 {
return errors.Trace(restoreErr)
}
// if checkpoint is not persisted, let's just unregister the task since we don't need it
checkpointPersisted, err := hasCheckpointPersisted(c, cfg)
if err != nil {
log.Error("failed to check if checkpoint is persisted", zap.Error(err))
// on error, assume checkpoint is not persisted and unregister the task
checkpointPersisted = false
}
if checkpointPersisted {
log.Info("pausing restore task from registry",
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(restoreErr))
if err := restoreRegistry.PauseTask(c, cfg.RestoreID); err != nil {
log.Error("failed to pause restore task from registry",
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
}
} else {
log.Info("unregistering restore task from registry",
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(restoreErr))
if err := restoreRegistry.Unregister(c, cfg.RestoreID); err != nil {
log.Error("failed to unregister restore task from registry",
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
}
}
return errors.Trace(restoreErr)
}
// unregister if restore id is not 0
if cfg.RestoreID != 0 {
// unregister restore task
if err := restoreRegistry.Unregister(c, cfg.RestoreID); err != nil {
log.Warn("failed to unregister restore task from registry",
zap.Uint64("restoreId", cfg.RestoreID), zap.Error(err))
}
}
// Clear the checkpoint data if needed
cleanUpCheckpoints(c, cfg)
return nil
}
func cleanUpCheckpoints(ctx context.Context, cfg *RestoreConfig) {
if cfg.UseCheckpoint {
log.Info("start to remove checkpoint data restore")
if cfg.logCheckpointMetaManager != nil {
err := cfg.logCheckpointMetaManager.RemoveCheckpointData(ctx)
if err != nil {
log.Warn("failed to remove checkpoint data for log restore", zap.Error(err))
}
}
if cfg.sstCheckpointMetaManager != nil {
err := cfg.sstCheckpointMetaManager.RemoveCheckpointData(ctx)
if err != nil {
log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err))
}
}
// Skip removing snapshot checkpoint data if this is a pure log restore
// (i.e. restoring only from log backup without a base snapshot backup),
// since snapshotCheckpointMetaManager would be nil in that case
if cfg.snapshotCheckpointMetaManager != nil {
err := cfg.snapshotCheckpointMetaManager.RemoveCheckpointData(ctx)
if err != nil {
log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err))
}
}
log.Info("all checkpoint data removed.")
} else {
log.Info("checkpoint not enabled, skip to remove checkpoint data")
}
}
// hasCheckpointPersisted checks if there are any checkpoint data persisted in storage or tables
func hasCheckpointPersisted(ctx context.Context, cfg *RestoreConfig) (bool, error) {
if cfg.snapshotCheckpointMetaManager != nil {
exists, err := cfg.snapshotCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return false, errors.Trace(err)
}
if exists {
return true, nil
}
}
if cfg.logCheckpointMetaManager != nil {
exists, err := cfg.logCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return false, errors.Trace(err)
}
if exists {
return true, nil
}
}
if cfg.sstCheckpointMetaManager != nil {
exists, err := cfg.sstCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return false, errors.Trace(err)
}
if exists {
return true, nil
}
}
return false, nil
}
type SnapshotRestoreConfig struct {
*RestoreConfig
piTRTaskInfo *PiTRTaskInfo
logRestoreStorage storeapi.Storage
logTableHistoryManager *stream.LogBackupTableHistoryManager
tableMappingManager *stream.TableMappingManager
}
func (s *SnapshotRestoreConfig) isPiTR() (bool, error) {
if s.piTRTaskInfo != nil && s.logTableHistoryManager != nil && s.tableMappingManager != nil {
return true, nil
}
if s.piTRTaskInfo == nil && s.logTableHistoryManager == nil && s.tableMappingManager == nil {
return false, nil
}
errMsg := "inconsistent PiTR components detected"
log.Error(errMsg,
zap.Any("piTRTaskInfo", s.piTRTaskInfo),
zap.Any("logTableHistoryManager", s.logTableHistoryManager),
zap.Any("tableMappingManager", s.tableMappingManager))
return false, errors.New(errMsg)
}
func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName string, cfg *SnapshotRestoreConfig) error {
cfg.Adjust()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
log.Info("starting snapshot restore")
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("task.RunRestore", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
loadSysTablePhysical := cfg.FastLoadSysTables && cfg.WithSysTable
loadStatsPhysical := cfg.FastLoadSysTables && cfg.LoadStats
// check if this is part of the PiTR operation
isPiTR, err := cfg.isPiTR()
if err != nil {
return errors.Trace(err)
}
// reads out information from backup meta file and do requirement checking if needed
u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
if backupMeta.IsRawKv || backupMeta.IsTxnKv {
return errors.Annotate(berrors.ErrRestoreModeMismatch, "cannot do transactional restore from raw/txn kv data")
}
if cfg.UpstreamClusterID == 0 {
cfg.UpstreamClusterID = backupMeta.ClusterId
}
if loadStatsPhysical || loadSysTablePhysical {
schemaVersionPair := snapclient.SchemaVersionPairT{}
upstreamClusterVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if upstreamClusterVersion == nil {
log.Warn("The cluster version from backupmeta is invalid. Fallback to logically load system tables.",
zap.String("backupmeta cluster version", backupMeta.ClusterVersion))
loadStatsPhysical = false
loadSysTablePhysical = false
} else {
schemaVersionPair.UpstreamVersionMajor = upstreamClusterVersion.Major
schemaVersionPair.UpstreamVersionMinor = upstreamClusterVersion.Minor
}
downstreamClusterVersionStr, err := mgr.GetClusterVersion(ctx)
if err != nil {
log.Warn("Failed to get the downstream cluster version. Fallback to logically load system tables.",
zap.Error(err))
loadStatsPhysical = false
loadSysTablePhysical = false
} else {
downstreamClusterVersion := version.NormalizeBackupVersion(downstreamClusterVersionStr)
if downstreamClusterVersion == nil {
log.Warn("The downstream cluster version is invalid. Fallback to logically load system tables.",
zap.String("downstream cluster version", downstreamClusterVersionStr))
loadStatsPhysical = false
loadSysTablePhysical = false
} else {
schemaVersionPair.DownstreamVersionMajor = downstreamClusterVersion.Major
schemaVersionPair.DownstreamVersionMinor = downstreamClusterVersion.Minor
}
}
if loadSysTablePhysical {
if schemaVersionPair.UpstreamVersionMajor != schemaVersionPair.DownstreamVersionMajor ||
schemaVersionPair.UpstreamVersionMinor != schemaVersionPair.DownstreamVersionMinor {
log.Warn("Cannot set --fast-load-sys-tables when the cluster version of snapshot backup is different from that of the downstream cluster. "+
"Fallback to logically load system tables.",
zap.String("upstream version", schemaVersionPair.UpstreamVersion()),
zap.String("downstream version", schemaVersionPair.DownstreamVersion()),
)
loadSysTablePhysical = false
}
}
}
if cfg.CheckRequirements {
log.Info("Checking incompatible TiCDC changefeeds before restoring.",
logutil.ShortError(err), zap.Uint64("restore-ts", backupMeta.EndVersion))
if err := checkIncompatibleChangefeed(ctx, backupMeta.EndVersion, mgr.GetDomain().GetEtcdClient()); err != nil {
return errors.Trace(err)
}
backupVersion := version.NormalizeBackupVersion(backupMeta.ClusterVersion)
if backupVersion != nil {
if versionErr := version.CheckClusterVersion(ctx, mgr.GetPDClient(), version.CheckVersionForBackup(backupVersion)); versionErr != nil {
return errors.Trace(versionErr)
}
}
}
if _, err = CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil {
return errors.Trace(err)
}
// build restore client
// need to retrieve these configs from tikv if not set in command.
kvConfigs := &pconfig.KVConfig{
ImportGoroutines: cfg.ConcurrencyPerStore,
MergeRegionSize: cfg.MergeSmallRegionSizeBytes,
MergeRegionKeyCount: cfg.MergeSmallRegionKeyCount,
}
// according to https://github.com/pingcap/tidb/issues/34167.
// we should get the real config from tikv to adapt the dynamic region.
httpCli := httputil.NewClient(mgr.GetTLSConfig())
mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli)
keepaliveCfg := GetKeepalive(&cfg.Config)
keepaliveCfg.PermitWithoutStream = true
client := snapclient.NewRestoreClient(mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), keepaliveCfg)
defer client.Close()
// set to cfg so that restoreStream can use it.
cfg.ConcurrencyPerStore = kvConfigs.ImportGoroutines
// using tikv config to set the concurrency-per-store for client.
client.SetConcurrencyPerStore(cfg.ConcurrencyPerStore.Value)
err = configureRestoreClient(ctx, client, cfg.RestoreConfig)
if err != nil {
return errors.Trace(err)
}
// InitConnections DB connection sessions
err = client.InitConnections(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
// register restore task if needed
err = RegisterRestoreIfNeeded(ctx, cfg.RestoreConfig, cmdName, mgr.GetDomain())
if err != nil {
return errors.Trace(err)
}
isNextGenRestore := utils.CheckNextGenCompatibility(cfg.KeyspaceName, cfg.CheckRequirements)
if isNextGenRestore {
log.Info("start restore to next-gen cluster")
}
metaReader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo)
if err = client.LoadSchemaIfNeededAndInitClient(ctx, backupMeta, u, metaReader, cfg.LoadStats, nil, nil,
cfg.ExplicitFilter, isFullRestore(cmdName), cfg.WithSysTable); err != nil {
return errors.Trace(err)
}
if client.IsIncremental() || cfg.ExplicitFilter || !isFullRestore(cmdName) {
if loadStatsPhysical {
log.Warn("Cannot set --fast-load-sys-tables when it is not full restore. Fallback to logically load stats.")
loadStatsPhysical = false
}
if loadSysTablePhysical {
log.Warn("Cannot set --fast-load-sys-tables when it is not full restore. Fallback to logically load system tables.")
loadSysTablePhysical = false
}
}
if client.IsIncremental() {
// don't support checkpoint for the ddl restore
log.Info("the incremental snapshot restore doesn't support checkpoint mode, disable checkpoint.")
cfg.UseCheckpoint = false
}
hash, err := cfg.RestoreConfig.Hash(cmdName)
if err != nil {
return errors.Trace(err)
}
cpEnabledAndExists, err := checkpointEnabledAndExists(ctx, g, cfg, mgr)
if err != nil {
return errors.Trace(err)
}
reusePreallocIDs, err := checkPreallocIDReusable(ctx, cfg, hash, cpEnabledAndExists)
if err != nil {
return errors.Trace(err)
}
log.Info("checkpoint status in restore", zap.Bool("enabled", cfg.UseCheckpoint), zap.Bool("exists", cpEnabledAndExists))
if err := checkMandatoryClusterRequirements(client, cfg, cpEnabledAndExists, cmdName); err != nil {
return errors.Trace(err)
}
// filters out db/table/files using filter
tableMap, dbMap, err := filterRestoreFiles(client, cfg.RestoreConfig, loadStatsPhysical)
if err != nil {
return errors.Trace(err)
}
log.Info("found items to restore after filtering",
zap.Int("tables", len(tableMap)),
zap.Int("db", len(dbMap)))
// only run when this full restore is part of the PiTR
if isPiTR {
snapshotDbMap := client.GetDatabaseMap()
snapshotTableMap := client.GetTableMap()
// adjust tables to restore in the snapshot restore phase since it will later be renamed during
// log restore and will fall into or out of the filter range.
err = AdjustTablesToRestoreAndCreateTableTracker(
cfg.logTableHistoryManager,
cfg.RestoreConfig,
snapshotDbMap,
snapshotTableMap,
client.GetPartitionMap(),
tableMap,
dbMap,
)
if err != nil {
return errors.Trace(err)
}
log.Info("adjusted items to restore",
zap.Int("tables", len(tableMap)),
zap.Int("db", len(dbMap)))
tableNameByTableID := func(tableId int64) string {
dbName, tableName, dbID := "", "", int64(0)
history := cfg.logTableHistoryManager.GetTableHistory()
if locations, exists := history[tableId]; exists {
if name, exists := cfg.logTableHistoryManager.GetDBNameByID(locations[1].DbID); exists {
dbName = name
}
dbID = locations[1].DbID
tableName = locations[1].TableName
} else if tableMeta, exists := tableMap[tableId]; exists && tableMeta != nil && tableMeta.Info != nil {
if tableMeta.DB != nil && len(dbName) == 0 {
dbName = tableMeta.DB.Name.O
}
tableName = tableMeta.Info.Name.O
}
if len(dbName) == 0 && dbID > 0 {
if dbInfo, exists := dbMap[dbID]; exists {
dbName = dbInfo.Info.Name.O
}
}
return fmt.Sprintf("%s.%s", dbName, tableName)
}
dbNameByDbId := func(dbId int64) string {
if dbName, ok := cfg.logTableHistoryManager.GetDBNameByID(dbId); ok {
return dbName
}
if dbInfo, ok := dbMap[dbId]; ok && dbInfo != nil && dbInfo.Info != nil {
return dbInfo.Info.Name.O
}
return ""
}
checkTableIdLost := func(tableId int64) bool {
// check whether exists in log backup
if _, exists := cfg.logTableHistoryManager.GetTableHistory()[tableId]; exists {
return false
}
// check whether exists in snapshot backup
if _, exists := snapshotTableMap[tableId]; exists {
return false
}
return true
}
checkDbIdLost := func(dbId int64) bool {
// check whether exists in log backup
if _, exists := cfg.logTableHistoryManager.GetDBNameByID(dbId); exists {
return false
}
// check whether exists in snapshot backup
if _, exists := snapshotDbMap[dbId]; exists {
return false
}
return true
}
if err := restore.CheckTableTrackerContainsTableIDsFromBlocklistFiles(
ctx, cfg.logRestoreStorage, cfg.PiTRTableTracker, backupMeta.GetEndVersion(), cfg.piTRTaskInfo.RestoreTS,
tableNameByTableID, dbNameByDbId, checkTableIdLost, checkDbIdLost, cfg.tableMappingManager.CleanError,
); err != nil {
return errors.Trace(err)
}
if err := cfg.tableMappingManager.ReportIfError(); err != nil {
return errors.Trace(err)
}
}
tables := utils.Values(tableMap)
dbs := utils.Values(dbMap)
// check if tables and dbs are valid to continue
if cfg.RestoreRegistry != nil && cfg.RestoreID != 0 {
log.Info("checking ongoing conflicting restore task using restore registry",
zap.Int("tables_count", len(tables)),
zap.Uint64("current_restore_id", cfg.RestoreID))
err := cfg.RestoreRegistry.CheckTablesWithRegisteredTasks(ctx, cfg.RestoreID, cfg.PiTRTableTracker, tables)
if err != nil {
return errors.Trace(err)
}
}
setTablesRestoreModeIfNeeded(tables, cfg, isPiTR, client.IsIncremental())
archiveSize := metautil.ArchiveTablesSize(tables)
// some more checks once we get tables and files information
if err := checkOptionalClusterRequirements(ctx, client, cfg, cpEnabledAndExists, mgr, tables, archiveSize, isPiTR); err != nil {
return errors.Trace(err)
}
g.Record(summary.RestoreDataSize, archiveSize)
//restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
g.Record("Size", archiveSize)
restoreTS, err := restore.GetTSWithRetry(ctx, mgr.GetPDClient())
if err != nil {
return errors.Trace(err)
}
if client.IsFullClusterRestore() && client.HasBackedUpSysDB() {
canLoadSysTablePhysical, err := snapclient.CheckSysTableCompatibility(mgr.GetDomain(), tables, client.GetCheckPrivilegeTableRowsCollateCompatiblity())
if err != nil {
return errors.Trace(err)
}
if loadSysTablePhysical && !canLoadSysTablePhysical {
log.Info("The system tables schema is not compatible. Fallback to logically load system tables.")
loadSysTablePhysical = false
}
if client.GetCheckPrivilegeTableRowsCollateCompatiblity() && canLoadSysTablePhysical {
log.Info("The system tables schema match so no need to set sys check collation")
client.SetCheckPrivilegeTableRowsCollateCompatiblity(false)
}
}
// preallocate the table id, because any ddl job or database creation(include checkpoint) also allocates the global ID
if userTableIDNotReusedWhenNeedCheck, err := client.AllocTableIDs(ctx, tables, loadStatsPhysical, loadSysTablePhysical, reusePreallocIDs); err != nil {
return errors.Trace(err)
} else if userTableIDNotReusedWhenNeedCheck {
log.Warn("Cannot load stats physically because not all table ids are reused. Fallback to logically load stats.")
// Notice that it will break the pitr id tracker since log restore support system restore
tables = fallbackStatsTables(tables)
loadStatsPhysical = false
}
tables = client.CleanTablesIfTemporarySystemTablesRenamed(loadStatsPhysical, loadSysTablePhysical, tables)
preAllocRange, err := client.GetPreAllocedTableIDRange()
if err != nil {
return errors.Trace(err)
}
importModeSwitcher := restore.NewImportModeSwitcher(mgr.GetPDClient(), cfg.Config.SwitchModeInterval, mgr.GetTLSConfig())
var restoreSchedulersFunc pdutil.UndoFunc
var schedulersConfig *pdutil.ClusterConfig
if (isFullRestore(cmdName) && !cfg.ExplicitFilter) || client.IsIncremental() {
restoreSchedulersFunc, schedulersConfig, err = restore.RestorePreWork(ctx, mgr, importModeSwitcher, cfg.Online, true)
} else {
if isPiTR && cfg.tableMappingManager != nil {
cfg.tableMappingManager.SetPreallocatedRange(preAllocRange[0], preAllocRange[1])
}
keyRange := rewriteKeyRanges(preAllocRange)
restoreSchedulersFunc, schedulersConfig, err = restore.FineGrainedRestorePreWork(
ctx, mgr, importModeSwitcher, keyRange, cfg.Online, true)
}
if err != nil {
return errors.Trace(err)
}
// need to know whether restore has been completed so can restore schedulers
canRestoreSchedulers := false
defer func() {
// don't reset pd scheduler if checkpoint mode is used and restored is not finished
if cfg.UseCheckpoint && !canRestoreSchedulers {
log.Info("skip removing pd scheduler for next retry")
return
}
log.Info("start to restore pd scheduler")
// run the post-work to avoid being stuck in the import
// mode or emptied schedulers.
restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulersFunc, cfg.Online)
log.Info("finish restoring pd scheduler")
}()
// reload or register the checkpoint
var checkpointSetWithTableID map[int64]map[string]struct{}
if cfg.UseCheckpoint {
logRestoredTS := uint64(0)
if cfg.piTRTaskInfo != nil {
logRestoredTS = cfg.piTRTaskInfo.RestoreTS
}
sets, restoreSchedulersConfigFromCheckpoint, newRestoreStartTS, err := client.InitCheckpoint(
ctx, cfg.snapshotCheckpointMetaManager, schedulersConfig, cfg.RestoreStartTS, logRestoredTS, hash, cpEnabledAndExists)
if err != nil {
return errors.Trace(err)
}
log.Info("try to reuse restore start timestamp for blocklist",
zap.Uint64("new restore start ts", newRestoreStartTS),
zap.Uint64("old restore start ts", cfg.RestoreStartTS),
)
cfg.RestoreStartTS = newRestoreStartTS
if restoreSchedulersConfigFromCheckpoint != nil {
// The last range rule will be dropped when the last restore quits.
restoreSchedulersConfigFromCheckpoint.RuleID = schedulersConfig.RuleID
restoreSchedulersFunc = mgr.MakeUndoFunctionByConfig(*restoreSchedulersConfigFromCheckpoint)
}
checkpointSetWithTableID = sets
defer func() {
// need to flush the whole checkpoint data so that br can quickly jump to
// the log kv restore step when the next retry.
log.Info("wait for flush checkpoint...")
client.WaitForFinishCheckpoint(ctx, len(cfg.FullBackupStorage) > 0 || !canRestoreSchedulers)
}()
}
err = client.InstallPiTRSupport(ctx, snapclient.PiTRCollDep{
PDCli: mgr.GetPDClient(),
EtcdCli: mgr.GetDomain().GetEtcdClient(),
Storage: util.ProtoV1Clone(u),
})
if err != nil {
return errors.Trace(err)
}
sp := utils.BRServiceSafePoint{
BackupTS: restoreTS,
TTL: utils.DefaultBRGCSafePointTTL,
ID: utils.MakeSafePointID(),
}
g.Record("BackupTS", backupMeta.EndVersion)
g.Record("RestoreTS", restoreTS)
cctx, gcSafePointKeeperCancel := context.WithCancel(ctx)
defer func() {
log.Info("start to remove gc-safepoint keeper")
// close the gc safe point keeper at first
gcSafePointKeeperCancel()
// set the ttl to 0 to remove the gc-safe-point
sp.TTL = 0
if err := utils.UpdateServiceSafePoint(ctx, mgr.GetPDClient(), sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
log.Info("finish removing gc-safepoint keeper")
}()
// restore checksum will check safe point with its start ts, see details at
// https://github.com/pingcap/tidb/blob/180c02127105bed73712050594da6ead4d70a85f/store/tikv/kv.go#L186-L190
// so, we should keep the safe point unchangeable. to avoid GC life time is shorter than transaction duration.
err = utils.StartServiceSafePointKeeper(cctx, mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
}
ddlJobs := FilterDDLJobs(client.GetDDLJobs(), tables)
ddlJobs = FilterDDLJobByRules(ddlJobs, DDLJobBlockListRule)
if cfg.AllowPITRFromIncremental {
err = CheckDDLJobByRules(ddlJobs, DDLJobLogIncrementalCompactBlockListRule)
if err != nil {
return errors.Trace(err)
}
}
err = PreCheckTableTiFlashReplica(ctx, mgr.GetPDClient(), tables, cfg.tiflashRecorder, isNextGenRestore)
if err != nil {
return errors.Trace(err)
}
err = PreCheckTableClusterIndex(tables, ddlJobs, mgr.GetDomain())
if err != nil {
return errors.Trace(err)
}
// pre-set TiDB config for restore
restoreDBConfig := tweakLocalConfForRestore()
defer restoreDBConfig()
if client.GetSupportPolicy() {
// create policy if backupMeta has policies.
policies, err := client.GetPlacementPolicies()
if err != nil {
return errors.Trace(err)
}
if isFullRestore(cmdName) {
// we should restore all policies during full restoration.
err = client.CreatePolicies(ctx, policies)
if err != nil {
return errors.Trace(err)
}
} else {
client.SetPolicyMap(policies)
}
}
// execute DDL first
if err = client.ExecDDLs(ctx, ddlJobs); err != nil {
return errors.Trace(err)
}
// nothing to restore, maybe only ddl changes in incremental restore
if len(dbs) == 0 && len(tables) == 0 {
log.Info("nothing to restore, all databases and tables are filtered out")
// even nothing to restore, we show a success message since there is no failure.
}
createdTables, err := createDBsAndTables(ctx, client, cfg, mgr, dbs, tables)
if err != nil {
return errors.Trace(err)
}
/* failpoint */
failpoint.Inject("sleep_for_check_scheduler_status", func(val failpoint.Value) {
fileName, ok := val.(string)
func() {
if !ok {
return
}
_, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if osErr != nil {
log.Warn("failed to create file", zap.Error(osErr))
return
}
}()
for {
_, statErr := os.Stat(fileName)
if os.IsNotExist(statErr) {
break
} else if statErr != nil {
log.Warn("error checking file", zap.Error(statErr))
break
}
time.Sleep(300 * time.Millisecond)
}
})
/* failpoint */
// update table mapping manager with new table ids if PiTR
if isPiTR {
if err = cfg.tableMappingManager.UpdateDownstreamIds(dbs, createdTables, client.GetDomain()); err != nil {
return errors.Trace(err)
}
log.Info("updated table mapping manager after creating tables")
}
anyFileKey := getAnyFileKeyFromTables(tables)
if len(anyFileKey) == 0 {
log.Info("no files, empty databases and tables are restored")
summary.SetSuccessStatus(true)
// don't return immediately, wait all pipeline done.
} else {
codec := mgr.GetStorage().GetCodec()
oldKeyspace, _, err := tikv.DecodeKey(anyFileKey, backupMeta.ApiVersion)
if err != nil {
return errors.Trace(err)
}
newKeyspace := codec.GetKeyspace()
// If the API V2 data occurs in the restore process, the cluster must
// support the keyspace rewrite mode.
if (len(oldKeyspace) > 0 || len(newKeyspace) > 0) && client.GetRewriteMode() == snapclient.RewriteModeLegacy {
return errors.Annotate(berrors.ErrRestoreModeMismatch, "cluster only supports legacy rewrite mode")
}
// Hijack the tableStream and rewrite the rewrite rules.
for _, createdTable := range createdTables {
// Set the keyspace info for the checksum requests
createdTable.RewriteRule.OldKeyspace = oldKeyspace
createdTable.RewriteRule.NewKeyspace = newKeyspace
for _, rule := range createdTable.RewriteRule.Data {
rule.OldKeyPrefix = slices.Concat(oldKeyspace, rule.OldKeyPrefix)
rule.NewKeyPrefix = codec.EncodeKey(rule.NewKeyPrefix)
}
}
}
if cfg.tiflashRecorder != nil {
for _, createdTable := range createdTables {
cfg.tiflashRecorder.Rewrite(createdTable.OldTable.Info.ID, createdTable.Table.ID)
}
}
// Do not reset timestamp if we are doing incremental restore, because
// we are not allowed to decrease timestamp.
if !client.IsIncremental() {
if err = client.ResetTS(ctx, mgr.PdController); err != nil {
log.Error("reset pd TS failed", zap.Error(err))
return errors.Trace(err)
}
}
compactProtectStartKey, compactProtectEndKey := encodeCompactAndCheckKey(mgr.GetStorage().GetCodec(), preAllocRange)
rtCtx := snapclient.RestoreTablesContext{
LogProgress: cfg.LogProgress,
SplitSizeBytes: kvConfigs.MergeRegionSize.Value,
SplitKeyCount: kvConfigs.MergeRegionKeyCount.Value,
// If the command is from BR binary, the ddl.EnableSplitTableRegion is always 0,
// If the command is from BRIE SQL, the ddl.EnableSplitTableRegion is TiDB config split-table.
// Notice that `split-region-on-table` configure from TiKV split on the region having data, it may trigger after restore done.
// It's recommended to enable TiDB configure `split-table` instead.
SplitOnTable: atomic.LoadUint32(&ddl.EnableSplitTableRegion) == 1,
Online: cfg.Online,
CreatedTables: createdTables,
CheckpointSetWithTableID: checkpointSetWithTableID,
CompactProtectStartKey: compactProtectStartKey,
CompactProtectEndKey: compactProtectEndKey,
Glue: g,
}
if err := client.RestoreTables(ctx, rtCtx); err != nil {
return errors.Trace(err)
}
plCtx := snapclient.PipelineContext{
// pipeline checksum only when enabled and is not incremental snapshot repair mode cuz incremental doesn't have
// enough information in backup meta to validate checksum
Checksum: cfg.Checksum && !client.IsIncremental(),
LoadStats: cfg.LoadStats,
LoadStatsPhysical: loadStatsPhysical,
LoadSysTablePhysical: loadSysTablePhysical,
WaitTiflashReady: cfg.WaitTiflashReady,
LogProgress: cfg.LogProgress,
ChecksumConcurrency: cfg.ChecksumConcurrency,
StatsConcurrency: cfg.StatsConcurrency,
RestoreTS: restoreTS,
KvClient: mgr.GetStorage().GetClient(),
ExtStorage: s,
Glue: g,
}
// Do some work in pipeline, such as checkum, load stats and wait tiflash ready.
if err := client.RestorePipeline(ctx, plCtx, createdTables); err != nil {
return errors.Trace(err)
}
// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
log.Info("restoring system schemas", zap.Bool("withSys", cfg.WithSysTable),
zap.Strings("filter", cfg.FilterStr))
err = client.RestoreSystemSchemas(ctx, cfg.TableFilter, loadSysTablePhysical)
if err != nil {
return errors.Trace(err)
}
failpoint.InjectCall("run-snapshot-restore-about-to-finish", &err)
if err != nil {
return err
}
canRestoreSchedulers = true
log.Info("snapshot restore success")
// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
}
func checkPreallocIDReusable(ctx context.Context, cfg *SnapshotRestoreConfig, hash []byte, checkpointExist bool) (prealloc *checkpoint.PreallocIDs, err error) {
if !checkpointExist {
return nil, nil
}
checkpointMeta, err := cfg.snapshotCheckpointMetaManager.LoadCheckpointMetadata(ctx)
log.Info("checkpoint metadata exists", zap.String("hash", hex.EncodeToString(hash)), zap.String("checkpoint hash", hex.EncodeToString(checkpointMeta.Hash)))
if err != nil {
return nil, errors.Trace(err)
}
if !bytes.Equal(checkpointMeta.Hash, hash) {
return nil, errors.Trace(errors.Annotatef(berrors.ErrRestoreCheckpointMismatch, "checkpoint hash mismatch, please use the same setting as the previous restore"))
}
return checkpointMeta.PreallocIDs, nil
}
func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) {
var resp map[string]any
err = utils.WithRetry(ctx, func() error {
resp, err = mgr.GetPDHTTPClient().GetReplicateConfig(ctx)
return err
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return 0, errors.Trace(err)
}
key := "max-replicas"
val, ok := resp[key]
if !ok {
return 0, errors.Errorf("key %s not found in response %v", key, resp)
}
return uint64(val.(float64)), nil
}
func getStores(ctx context.Context, mgr *conn.Mgr) (stores *http.StoresInfo, err error) {
err = utils.WithRetry(ctx, func() error {
stores, err = mgr.GetPDHTTPClient().GetStores(ctx)
return err
}, utils.NewAggressivePDBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
return stores, nil
}
func EstimateTikvUsage(archiveSize uint64, replicaCnt uint64, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
if replicaCnt > storeCnt {
replicaCnt = storeCnt
}
log.Info("estimate tikv usage", zap.Uint64("total size", archiveSize), zap.Uint64("replicaCnt", replicaCnt), zap.Uint64("store count", storeCnt))
return archiveSize * replicaCnt / storeCnt
}
func EstimateTiflashUsage(tables []*metautil.Table, storeCnt uint64) uint64 {
if storeCnt == 0 {
return 0
}
tiflashTotal := uint64(0)
for _, table := range tables {
if table.Info.TiFlashReplica == nil || table.Info.TiFlashReplica.Count <= 0 {
continue
}
tableBytes := metautil.ArchiveTableSize(table)
tiflashTotal += tableBytes * table.Info.TiFlashReplica.Count
}
log.Info("estimate tiflash usage", zap.Uint64("total size", tiflashTotal), zap.Uint64("store count", storeCnt))
return tiflashTotal / storeCnt
}
func CheckStoreSpace(necessary uint64, store *http.StoreInfo) error {
available, err := units.RAMInBytes(store.Status.Available)
if err != nil {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if available <= 0 {
return errors.Annotatef(berrors.ErrPDInvalidResponse, "store %d has invalid available space %s", store.Store.ID, store.Status.Available)
}
if uint64(available) < necessary {
return errors.Annotatef(berrors.ErrKVDiskFull, "store %d has no space left on device, available %s, necessary %s",
store.Store.ID, units.BytesSize(float64(available)), units.BytesSize(float64(necessary)))
}
return nil
}
func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, archiveSize uint64) error {
maxReplica, err := getMaxReplica(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
stores, err := getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
var tikvCnt, tiflashCnt uint64 = 0, 0
for i := range stores.Stores {
store := &stores.Stores[i]
if engine.IsTiFlashHTTPResp(&store.Store) {
tiflashCnt += 1
continue
}
tikvCnt += 1
}
// We won't need to restore more than 1800 PB data at one time, right?
preserve := func(base uint64, ratio float32) uint64 {
if base > 1000*units.PB {
return base
}
return base * uint64(ratio*10) / 10
}
// The preserve rate for tikv is quite accurate, while rate for tiflash is a
// number calculated from tpcc testing with variable data sizes. 1.4 is a
// relative conservative value.
tikvUsage := preserve(EstimateTikvUsage(archiveSize, maxReplica, tikvCnt), 1.1)
tiflashUsage := preserve(EstimateTiflashUsage(tables, tiflashCnt), 1.4)
log.Info("preserved disk space", zap.Uint64("tikv", tikvUsage), zap.Uint64("tiflash", tiflashUsage))
err = utils.WithRetry(ctx, func() error {
stores, err = getStores(ctx, mgr)
if err != nil {
return errors.Trace(err)
}
for _, store := range stores.Stores {
if engine.IsTiFlashHTTPResp(&store.Store) {
if err := CheckStoreSpace(tiflashUsage, &store); err != nil {
return errors.Trace(err)
}
continue
}
if err := CheckStoreSpace(tikvUsage, &store); err != nil {
return errors.Trace(err)
}
}
return nil
}, utils.NewDiskCheckBackoffStrategy())
if err != nil {
return errors.Trace(err)
}
return nil
}
func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table) error {
message := "table already exists: "
allUnique := true
for _, table := range tables {
_, err := mgr.GetDomain().InfoSchema().TableByName(ctx, table.DB.Name, table.Info.Name)
if err == nil {
message += fmt.Sprintf("%s.%s ", table.DB.Name, table.Info.Name)
allUnique = false
} else if !infoschema.ErrTableNotExists.Equal(err) {
return errors.Trace(err)
}
}
if !allUnique {
return errors.Annotate(berrors.ErrTablesAlreadyExisted, message)
}
return nil
}
func getAnyFileKeyFromTables(tables []*metautil.Table) []byte {
for _, table := range tables {
for _, files := range table.FilesOfPhysicals {
for _, f := range files {
if len(f.StartKey) > 0 {
return f.StartKey
}
}
}
}
return nil
}
func fallbackStatsTables(tables []*metautil.Table) []*metautil.Table {
newTables := make([]*metautil.Table, 0, len(tables))
for _, table := range tables {
if snapclient.IsStatsTemporaryTable(table.DB.Name.O, table.Info.Name.O) {
continue
}
newTables = append(newTables, table)
}
return newTables
}
// filterRestoreFiles filters out dbs and tables.
func filterRestoreFiles(
client *snapclient.SnapClient,
cfg *RestoreConfig,
loadStatsPhysical bool,
) (tableMap map[int64]*metautil.Table, dbMap map[int64]*metautil.Database, err error) {
tableMap = make(map[int64]*metautil.Table)
dbMap = make(map[int64]*metautil.Database)
for _, db := range client.GetDatabases() {
dbName := db.Info.Name.O
if checkpoint.IsCheckpointDB(dbName) {
continue
}
if !loadStatsPhysical && !utils.MatchSchema(cfg.TableFilter, dbName, cfg.WithSysTable) {
continue
}
dbMap[db.Info.ID] = db
for _, table := range db.Tables {
if table.Info == nil {
continue
}
if !(loadStatsPhysical && snapclient.IsStatsTemporaryTable(dbName, table.Info.Name.O)) {
if !utils.MatchTable(cfg.TableFilter, dbName, table.Info.Name.O, cfg.WithSysTable) {
continue
}
}
// Add table to tableMap using table ID as key
tableMap[table.Info.ID] = table
}
}
// sanity check
if len(dbMap) == 0 && len(tableMap) != 0 {
err = errors.Annotate(berrors.ErrRestoreInvalidBackup, "contains tables but no databases")
}
return
}
// getDBNameFromBackup gets database name from either snapshot or log backup history
func getDBNameFromBackup(
dbID int64,
snapshotDBMap map[int64]*metautil.Database,
logBackupTableHistory *stream.LogBackupTableHistoryManager,
) (dbName string, exists bool) {
// check in snapshot
if snapDb, exists := snapshotDBMap[dbID]; exists {
return snapDb.Info.Name.O, true
}
// check during log backup
if name, exists := logBackupTableHistory.GetDBNameByID(dbID); exists {
return name, true
}
log.Warn("did not find db id in full/log backup, "+
"likely different filters are specified for full/log backup and restore, ignoring this db",
zap.Any("dbId", dbID))
return "", false
}
// processLogBackupTableHistory processes table-level operations (renames) and determines which tables need to be restored
func processLogBackupTableHistory(
history *stream.LogBackupTableHistoryManager,
snapshotDBMap map[int64]*metautil.Database,
snapshotTableMap map[int64]*metautil.Table,
partitionMap map[int64]*stream.TableLocationInfo,
cfg *RestoreConfig,
existingTableMap map[int64]*metautil.Table,
existingDBMap map[int64]*metautil.Database,
pitrIdTracker *utils.PiTRIdTracker,
) {
for tableId, dbIDAndTableName := range history.GetTableHistory() {
start := buildStartTableLocationInfo(tableId, &dbIDAndTableName[0], snapshotTableMap, partitionMap)
end := &dbIDAndTableName[1]
endDBName, exists := getDBNameFromBackup(end.DbID, snapshotDBMap, history)
if !exists {
continue
}
endMatches := utils.MatchTable(cfg.TableFilter, endDBName, end.TableName, cfg.WithSysTable)
// if end matches, add to tracker
if endMatches {
if !end.IsPartition {
pitrIdTracker.TrackTableId(end.DbID, tableId)
// used to check if existing cluster has same table already so can error out
pitrIdTracker.TrackTableName(endDBName, end.TableName)
log.Info("tracking table", zap.Int64("schemaID", end.DbID),
zap.Int64("tableID", tableId), zap.String("tableName", end.TableName))
} else {
// only used for partition violation checking later
pitrIdTracker.TrackPartitionId(tableId)
}
}
// skip if partition
if start.IsPartition || end.IsPartition {
continue
}
_, isStartInSnap := snapshotTableMap[tableId]
// no need to adjust tables if start is not in snapshot
if !isStartInSnap {
continue
}
startDBName, exists := getDBNameFromBackup(start.DbID, snapshotDBMap, history)
if !exists {
continue
}
startMatches := utils.MatchTable(cfg.TableFilter, startDBName, start.TableName, cfg.WithSysTable)
// skip if both not match
if !startMatches && !endMatches {
continue
}
// skip if both match and nothing changes
if startMatches && endMatches {
// skip if both match and in same db
if start.DbID == end.DbID {
continue
}
}
// at here only three cases left
// 1. both match but start and end are not in same DB due to rename
// 2. end matches but start doesn't -> need to restore (add)
// 3. start matches but end doesn't -> need to remove
if startDB, exists := snapshotDBMap[start.DbID]; exists {
for _, table := range startDB.Tables {
if table.Info != nil && table.Info.ID == tableId {
if endMatches {
log.Info("table renamed into the filter, adding this table",
zap.Int64("table_id", tableId),
zap.String("table_name", table.Info.Name.O))
existingTableMap[tableId] = table
existingDBMap[start.DbID] = startDB
} else if startMatches {
log.Info("table renamed out of filter, removing this table",
zap.Int64("table_id", tableId),
zap.String("table_name", table.Info.Name.O))
delete(existingTableMap, table.Info.ID)
}
break
}
}
}
}
}
// shouldRestoreTable checks if a table or partition is being tracked for restore
func shouldRestoreTable(
physicalId int64,
locationInfo *stream.TableLocationInfo,
cfg *RestoreConfig,
) bool {
// if is a partition, check whether its parent table is included to restore
if locationInfo.IsPartition {
return cfg.PiTRTableTracker.ContainsTableId(locationInfo.ParentTableID) ||
cfg.PiTRTableTracker.ContainsPartitionId(locationInfo.ParentTableID)
}
// if is tabla, check if will be restored
return cfg.PiTRTableTracker.ContainsTableId(physicalId) || cfg.PiTRTableTracker.ContainsPartitionId(physicalId)
}
func buildStartTableLocationInfo(
physicalID int64,
start *stream.TableLocationInfo,
snapshotTableMap map[int64]*metautil.Table,
partitionMap map[int64]*stream.TableLocationInfo) *stream.TableLocationInfo {
if partitionInfo, exist := partitionMap[physicalID]; exist {
return partitionInfo
}
if tableInfo, exist := snapshotTableMap[physicalID]; exist {
return &stream.TableLocationInfo{
DbID: tableInfo.DB.ID,
TableName: tableInfo.Info.Name.O,
IsPartition: false,
ParentTableID: 0,
}
}
return start
}
// checkPartitionExchangesViolations checks for partition exchanges and returns an error if a partition
// was exchanged between tables where one is in the filter and one is not
func checkPartitionExchangesViolations(
history *stream.LogBackupTableHistoryManager,
snapshotDBMap map[int64]*metautil.Database,
snapshotTableMap map[int64]*metautil.Table,
partitionMap map[int64]*stream.TableLocationInfo,
cfg *RestoreConfig,
) error {
for tableId, dbIDAndTableName := range history.GetTableHistory() {
start := &dbIDAndTableName[0]
end := &dbIDAndTableName[1]
// need to use snapshot start if exists
start = buildStartTableLocationInfo(tableId, start, snapshotTableMap, partitionMap)
// skip if none are partition, tables are handled in the previous step
if !start.IsPartition && !end.IsPartition {
continue
}
// skip if parent table id are the same (if it's a table, parent table id will be 0)
if start.ParentTableID == end.ParentTableID {
continue
}
restoreStart := shouldRestoreTable(tableId, start, cfg)
restoreEnd := shouldRestoreTable(tableId, end, cfg)
// error out if partition is exchanged between tables where one should restore and one shouldn't
if restoreStart != restoreEnd {
startDBName, exists := getDBNameFromBackup(start.DbID, snapshotDBMap, history)
if !exists {
startDBName = fmt.Sprintf("(unknown db name %d)", start.DbID)
}
endDBName, exists := getDBNameFromBackup(end.DbID, snapshotDBMap, history)
if !exists {
endDBName = fmt.Sprintf("(unknown db name %d)", end.DbID)
}
return errors.Annotatef(berrors.ErrRestoreModeMismatch,
"partition exchange detected: partition ID %d was exchanged between table '%s.%s' (ID: %d) "+
" and table '%s.%s' (ID: %d), but only one table will be restored (restoreStart=%v, restoreEnd=%v).",
tableId, startDBName, start.TableName, start.ParentTableID,
endDBName, end.TableName, end.ParentTableID, restoreStart, restoreEnd)
}
}
return nil
}
func AdjustTablesToRestoreAndCreateTableTracker(
logBackupTableHistory *stream.LogBackupTableHistoryManager,
cfg *RestoreConfig,
snapshotDBMap map[int64]*metautil.Database,
snapshotTableMap map[int64]*metautil.Table,
partitionMap map[int64]*stream.TableLocationInfo,
tableMap map[int64]*metautil.Table,
DBMap map[int64]*metautil.Database,
) (err error) {
// build tracker for pitr restore to use later
piTRIdTracker := utils.NewPiTRIdTracker()
cfg.PiTRTableTracker = piTRIdTracker
// track newly created databases
newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory()
for dbId, dbName := range newlyCreatedDBs {
if utils.MatchSchema(cfg.TableFilter, dbName, cfg.WithSysTable) {
piTRIdTracker.AddDB(dbId)
}
}
// first handle table renames to determine which tables we need
processLogBackupTableHistory(logBackupTableHistory, snapshotDBMap, snapshotTableMap,
partitionMap, cfg, tableMap, DBMap, piTRIdTracker)
// track all snapshot tables that's going to restore in PiTR tracker
for tableID, table := range tableMap {
piTRIdTracker.TrackTableId(table.DB.ID, tableID)
piTRIdTracker.TrackTableName(table.DB.Name.O, table.Info.Name.O)
}
// handle partition exchange after all tables are tracked
if err := checkPartitionExchangesViolations(logBackupTableHistory, snapshotDBMap,
snapshotTableMap, partitionMap, cfg); err != nil {
return errors.Trace(err)
}
return nil
}
// tweakLocalConfForRestore tweaks some of configs of TiDB to make the restore progress go well.
// return a function that could restore the config to origin.
func tweakLocalConfForRestore() func() {
restoreConfig := config.RestoreFunc()
config.UpdateGlobal(func(conf *config.Config) {
// set max-index-length before execute DDLs and create tables
// we set this value to max(3072*4), otherwise we might not restore table
// when upstream and downstream both set this value greater than default(3072)
conf.MaxIndexLength = config.DefMaxOfMaxIndexLength
log.Warn("set max-index-length to max(3072*4) to skip check index length in DDL")
conf.IndexLimit = config.DefMaxOfIndexLimit
log.Warn("set index-limit to max(64*8) to skip check index count in DDL")
conf.TableColumnCountLimit = config.DefMaxOfTableColumnCountLimit
log.Warn("set table-column-count to max(4096) to skip check column count in DDL")
})
return restoreConfig
}
func getTiFlashNodeCount(ctx context.Context, pdClient pd.Client) (uint64, error) {
tiFlashStores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, connutil.TiFlashOnly)
if err != nil {
return 0, errors.Trace(err)
}
return uint64(len(tiFlashStores)), nil
}
// PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
func PreCheckTableTiFlashReplica(
ctx context.Context,
pdClient pd.Client,
tables []*metautil.Table,
recorder *tiflashrec.TiFlashRecorder,
isNextGenRestore bool,
) error {
if isNextGenRestore {
log.Warn("Restoring to NextGen TiFlash is experimental. TiFlash replicas are disabled; please reset them manually after restore.")
for _, tbl := range tables {
if tbl == nil || tbl.Info == nil {
// unreachable
continue
}
if tbl.Info.TiFlashReplica != nil {
tbl.Info.TiFlashReplica = nil
}
}
return nil
}
tiFlashStoreCount, err := getTiFlashNodeCount(ctx, pdClient)
if err != nil {
return err
}
for _, table := range tables {
if table.Info.TiFlashReplica != nil {
// we should not set available to true. because we cannot guarantee the raft log lag of tiflash when restore finished.
// just let tiflash ticker set it by checking lag of all related regions.
table.Info.TiFlashReplica.Available = false
table.Info.TiFlashReplica.AvailablePartitionIDs = nil
if recorder != nil {
recorder.AddTable(table.Info.ID, *table.Info.TiFlashReplica)
log.Info("record tiflash replica for table, to reset it by ddl later",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
)
table.Info.TiFlashReplica = nil
} else if table.Info.TiFlashReplica.Count > tiFlashStoreCount {
// we cannot satisfy TiFlash replica in restore cluster. so we should
// set TiFlashReplica to unavailable in tableInfo, to avoid TiDB cannot sense TiFlash and make plan to TiFlash
// see details at https://github.com/pingcap/br/issues/931
// TODO maybe set table.Info.TiFlashReplica.Count to tiFlashStoreCount, but we need more tests about it.
log.Warn("table does not satisfy tiflash replica requirements, set tiflash replcia to unavailable",
zap.Stringer("db", table.DB.Name),
zap.Stringer("table", table.Info.Name),
zap.Uint64("expect tiflash replica", table.Info.TiFlashReplica.Count),
zap.Uint64("actual tiflash store", tiFlashStoreCount),
)
table.Info.TiFlashReplica = nil
}
}
}
return nil
}
// PreCheckTableClusterIndex checks whether backup tables and existed tables have different cluster index options。
func PreCheckTableClusterIndex(
tables []*metautil.Table,
ddlJobs []*model.Job,
dom *domain.Domain,
) error {
for _, table := range tables {
oldTableInfo, err := restore.GetTableSchema(dom, table.DB.Name, table.Info.Name)
// table exists in database
if err == nil {
if table.Info.IsCommonHandle != oldTableInfo.IsCommonHandle {
log.Error("Clustered index option mismatch", zap.String("schemaName", table.DB.Name.O), zap.String("tableName", table.Info.Name.O))
return errors.Annotatef(berrors.ErrRestoreModeMismatch,
"Clustered index option mismatch. Restored cluster's @@tidb_enable_clustered_index should be %v (backup table = %v, created table = %v).",
restore.TransferBoolToValue(table.Info.IsCommonHandle),
table.Info.IsCommonHandle,
oldTableInfo.IsCommonHandle)
}
}
}
for _, job := range ddlJobs {
if job.Type == model.ActionCreateTable {
tableInfo := job.BinlogInfo.TableInfo
if tableInfo != nil {
oldTableInfo, err := restore.GetTableSchema(dom, ast.NewCIStr(job.SchemaName), tableInfo.Name)
// table exists in database
if err == nil {
if tableInfo.IsCommonHandle != oldTableInfo.IsCommonHandle {
log.Error("Clustered index option mismatch", zap.String("schemaName", job.SchemaName), zap.String("tableName", tableInfo.Name.O))
return errors.Annotatef(berrors.ErrRestoreModeMismatch,
"Clustered index option mismatch. Restored cluster's @@tidb_enable_clustered_index should be %v (backup table = %v, created table = %v).",
restore.TransferBoolToValue(tableInfo.IsCommonHandle),
tableInfo.IsCommonHandle,
oldTableInfo.IsCommonHandle)
}
}
}
}
}
return nil
}
func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
if !dbIDs[table.DB.ID] {
dbs = append(dbs, table.DB)
dbIDs[table.DB.ID] = true
}
}
return
}
// FilterDDLJobs filters ddl jobs.
func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job) {
// Sort the ddl jobs by schema version in descending order.
slices.SortFunc(allDDLJobs, func(i, j *model.Job) int {
return cmp.Compare(j.BinlogInfo.SchemaVersion, i.BinlogInfo.SchemaVersion)
})
dbs := getDatabases(tables)
for _, db := range dbs {
// These maps is for solving some corner case.
// e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is:
// rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2)
// Which we cannot find the "create" DDL by name and id directly.
// To cover †his case, we must find all names and ids the database/table ever had.
dbIDs := make(map[int64]bool)
dbIDs[db.ID] = true
dbNames := make(map[string]bool)
dbNames[db.Name.String()] = true
for _, job := range allDDLJobs {
if job.BinlogInfo.DBInfo != nil {
if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] {
ddlJobs = append(ddlJobs, job)
// The the jobs executed with the old id, like the step 2 in the example above.
dbIDs[job.SchemaID] = true
// For the jobs executed after rename, like the step 3 in the example above.
dbNames[job.BinlogInfo.DBInfo.Name.String()] = true
}
}
}
}
for _, table := range tables {
tableIDs := make(map[int64]bool)
tableIDs[table.Info.ID] = true
tableNames := make(map[restore.UniqueTableName]bool)
name := restore.UniqueTableName{DB: table.DB.Name.String(), Table: table.Info.Name.String()}
tableNames[name] = true
for _, job := range allDDLJobs {
if job.BinlogInfo.TableInfo != nil {
name = restore.UniqueTableName{DB: job.SchemaName, Table: job.BinlogInfo.TableInfo.Name.String()}
if tableIDs[job.TableID] || tableNames[name] {
ddlJobs = append(ddlJobs, job)
tableIDs[job.TableID] = true
// For truncate table, the id may be changed
tableIDs[job.BinlogInfo.TableInfo.ID] = true
tableNames[name] = true
}
}
}
}
return ddlJobs
}
// CheckDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func CheckDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) error {
for _, ddlJob := range srcDDLJobs {
for _, rule := range rules {
if rule(ddlJob) {
return errors.Annotatef(berrors.ErrRestoreModeMismatch, "DDL job %s is not allowed in incremental restore"+
" when --allow-pitr-from-incremental enabled", ddlJob.String())
}
}
}
return nil
}
// FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job) {
dstDDLJobs = make([]*model.Job, 0, len(srcDDLJobs))
for _, ddlJob := range srcDDLJobs {
passed := true
for _, rule := range rules {
if rule(ddlJob) {
passed = false
break
}
}
if passed {
dstDDLJobs = append(dstDDLJobs, ddlJob)
}
}
return
}
func encodeCompactAndCheckKey(codec tikv.Codec, preAlloced [2]int64) ([]byte, []byte) {
checkStartKey := tablecodec.EncodeTablePrefix(preAlloced[0])
checkEndKey := tablecodec.EncodeTablePrefix(preAlloced[1])
return codec.EncodeRange(checkStartKey, checkEndKey)
}
func rewriteKeyRanges(preAlloced [2]int64) [][2]kv.Key {
if preAlloced == [2]int64{} {
return nil
}
startKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(preAlloced[0]))
endKey := codec.EncodeBytes([]byte{}, tablecodec.EncodeTablePrefix(preAlloced[1]))
return [][2]kv.Key{{startKey, endKey}}
}
type DDLJobFilterRule func(ddlJob *model.Job) bool
var incrementalRestoreActionBlockList = map[model.ActionType]struct{}{
model.ActionSetTiFlashReplica: {},
model.ActionUpdateTiFlashReplicaStatus: {},
model.ActionLockTable: {},
model.ActionUnlockTable: {},
}
var logIncrementalRestoreCompactibleBlockList = map[model.ActionType]struct{}{
model.ActionAddIndex: {},
model.ActionModifyColumn: {},
model.ActionReorganizePartition: {},
}
// DDLJobBlockListRule rule for filter ddl job with type in block list.
func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}
func DDLJobLogIncrementalCompactBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, logIncrementalRestoreCompactibleBlockList)
}
func checkIsInActions(action model.ActionType, actions map[model.ActionType]struct{}) bool {
_, ok := actions[action]
return ok
}
// checkpointEnabledAndExists returns true if checkpoint has been enabled and already have checkpoint persisted
func checkpointEnabledAndExists(
ctx context.Context,
g glue.Glue,
cfg *SnapshotRestoreConfig,
mgr *conn.Mgr) (bool, error) {
var checkpointEnabledAndExist = false
if cfg.UseCheckpoint {
if len(cfg.CheckpointStorage) > 0 {
clusterID := mgr.PDClient().GetClusterID(ctx)
if err := cfg.newStorageCheckpointMetaManagerSnapshot(ctx, clusterID, cfg.RestoreID); err != nil {
return false, errors.Trace(err)
}
} else {
if err := cfg.newTableCheckpointMetaManagerSnapshot(g, mgr.GetDomain(), cfg.RestoreID); err != nil {
return false, errors.Trace(err)
}
}
// if the checkpoint metadata exists in the checkpoint storage, the restore is not
// for the first time.
existsCheckpointMetadata, err := cfg.snapshotCheckpointMetaManager.ExistsCheckpointMetadata(ctx)
if err != nil {
return false, errors.Trace(err)
}
checkpointEnabledAndExist = existsCheckpointMetadata
}
return checkpointEnabledAndExist, nil
}
// checkMandatoryClusterRequirements checks
// 1. kv mode
// 2. if db and tables are in backup if it's restore db or restore table command
// 3. check if cluster is empty if it's a full cluster restore without filter and checkpoints
// it's mandatory since it cannot be turned off
func checkMandatoryClusterRequirements(client *snapclient.SnapClient, cfg *SnapshotRestoreConfig,
checkpointEnabledAndExists bool, cmdName string) error {
// verify dbs and tables are in backup
if err := VerifyDBAndTableInBackup(client.GetDatabases(), cfg.RestoreConfig); err != nil {
return errors.Trace(err)
}
if isFullRestore(cmdName) {
if client.NeedCheckFreshCluster(cfg.ExplicitFilter, checkpointEnabledAndExists) {
if err := client.EnsureNoUserTables(); err != nil {
return errors.Trace(err)
}
}
}
return nil
}
// checkOptionalClusterRequirements checks disk space and table existence. It's optional since it can be turned off by
// config cfg.CheckRequirements
func checkOptionalClusterRequirements(
ctx context.Context,
client *snapclient.SnapClient,
cfg *SnapshotRestoreConfig,
checkpointEnabledAndExists bool,
mgr *conn.Mgr,
tables []*metautil.Table,
archiveSize uint64,
isPitr bool) error {
if cfg.CheckRequirements && !checkpointEnabledAndExists {
if err := checkDiskSpace(ctx, mgr, tables, archiveSize); err != nil {
return errors.Trace(err)
}
if !client.IsIncremental() {
if err := checkTableExistence(ctx, mgr, tables); err != nil {
return errors.Trace(err)
}
if isPitr {
if err := checkTableExistence(ctx, mgr, buildLogBackupMetaTables(cfg.PiTRTableTracker.DBNameToTableNames)); err != nil {
return errors.Trace(err)
}
}
}
}
return nil
}
func buildLogBackupMetaTables(dbNameToTableNames map[string]map[string]struct{}) []*metautil.Table {
tables := make([]*metautil.Table, 0)
for dbName, tableNames := range dbNameToTableNames {
for tableName := range tableNames {
table := &metautil.Table{
DB: &model.DBInfo{
Name: ast.NewCIStr(dbName),
},
Info: &model.TableInfo{
Name: ast.NewCIStr(tableName),
},
}
tables = append(tables, table)
}
}
return tables
}
func createDBsAndTables(
ctx context.Context,
client *snapclient.SnapClient,
cfg *SnapshotRestoreConfig,
mgr *conn.Mgr,
dbs []*metautil.Database,
tables []*metautil.Table) ([]*restoreutils.CreatedTable, error) {
var newTS uint64
if client.IsIncremental() {
if !cfg.AllowPITRFromIncremental {
// we need to get the new ts after execDDL
// or backfilled data in upstream may not be covered by
// the new ts.
// see https://github.com/pingcap/tidb/issues/54426
var err error
newTS, err = restore.GetTSWithRetry(ctx, mgr.GetPDClient())
if err != nil {
return nil, errors.Trace(err)
}
}
}
// create databases first, it will skip if already exists
if err := client.CreateDatabases(ctx, dbs); err != nil {
return nil, errors.Trace(err)
}
createdTables, err := client.CreateTables(ctx, tables, newTS)
if err != nil {
return nil, errors.Trace(err)
}
return createdTables, nil
}
func setTablesRestoreModeIfNeeded(tables []*metautil.Table, cfg *SnapshotRestoreConfig, isPiTR bool,
isIncremental bool) {
if cfg.ExplicitFilter && isPiTR && !isIncremental {
for i, table := range tables {
// skip sequence as there is extra steps need to do after creation and restoreMode will block it
if table.Info.IsSequence() {
continue
}
tableCopy := *table
tableCopy.Info = table.Info.Clone()
tableCopy.Info.Mode = model.TableModeRestore
tables[i] = &tableCopy
}
log.Info("set tables to restore mode for filtered PiTR restore", zap.Int("table count", len(tables)))
}
}
// RunRestoreAbort aborts a restore task by finding it in the registry and cleaning up
// Similar to resumeOrCreate, it first resolves the restoredTS then finds and deletes the matching paused task
func RunRestoreAbort(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
cfg.Adjust()
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
// update keyspace to be user specified
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = cfg.KeyspaceName
})
keepaliveCfg := GetKeepalive(&cfg.Config)
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, true, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
// get upstream cluster ID and startTS from backup storage if not already set
if cfg.UpstreamClusterID == 0 {
if IsStreamRestore(cmdName) {
// For PiTR restore, get cluster ID from log storage
_, s, err := GetStorage(ctx, cfg.Config.Storage, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
logInfo, err := getLogInfoFromStorage(ctx, s)
if err != nil {
return errors.Trace(err)
}
cfg.UpstreamClusterID = logInfo.clusterID
// For PiTR with full backup, get startTS from full backup meta
if len(cfg.FullBackupStorage) > 0 && cfg.StartTS == 0 {
startTS, fullClusterID, err := getFullBackupTS(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
if logInfo.clusterID > 0 && fullClusterID > 0 && logInfo.clusterID != fullClusterID {
return errors.Annotatef(berrors.ErrInvalidArgument,
"cluster ID mismatch: log backup from cluster %d, full backup from cluster %d",
logInfo.clusterID, fullClusterID)
}
cfg.StartTS = startTS
log.Info("extracted startTS from full backup storage for abort",
zap.Uint64("start_ts", cfg.StartTS))
}
} else {
// For snapshot restore, get cluster ID from backup meta
_, _, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config)
if err != nil {
return errors.Trace(err)
}
cfg.UpstreamClusterID = backupMeta.ClusterId
}
log.Info("extracted upstream cluster ID from backup storage for abort",
zap.Uint64("upstream_cluster_id", cfg.UpstreamClusterID),
zap.String("cmd", cmdName))
}
// build restore registry
restoreRegistry, err := registry.NewRestoreRegistry(ctx, g, mgr.GetDomain())
if err != nil {
return errors.Trace(err)
}
defer restoreRegistry.Close()
// determine if restoredTS was user-specified
// if RestoreTS is 0, it means user didn't specify it (similar to resumeOrCreate logic)
isRestoredTSUserSpecified := cfg.RestoreTS != 0
// create registration info from config to find matching tasks
registrationInfo := registry.RegistrationInfo{
FilterStrings: cfg.FilterStr,
StartTS: cfg.StartTS,
RestoredTS: cfg.RestoreTS,
UpstreamClusterID: cfg.UpstreamClusterID,
WithSysTable: cfg.WithSysTable,
Cmd: cmdName,
}
// find and delete matching paused task atomically
// this will first resolve the restoredTS (similar to resumeOrCreate) then find and delete the task
deletedRestoreID, err := restoreRegistry.FindAndDeleteMatchingTask(ctx, registrationInfo, isRestoredTSUserSpecified)
if err != nil {
return errors.Trace(err)
}
if deletedRestoreID == 0 {
log.Info("no paused restore task found with matching parameters")
return nil
}
log.Info("successfully deleted matching paused restore task", zap.Uint64("restoreId", deletedRestoreID))
// clean up checkpoint data for the deleted task
log.Info("cleaning up checkpoint data", zap.Uint64("restoreId", deletedRestoreID))
// update config with restore ID to clean up checkpoint
cfg.RestoreID = deletedRestoreID
// initialize all checkpoint managers for cleanup (deletion is noop if checkpoints not exist)
if len(cfg.CheckpointStorage) > 0 {
clusterID := mgr.GetPDClient().GetClusterID(ctx)
log.Info("initializing storage checkpoint meta managers for cleanup",
zap.Uint64("restoreID", deletedRestoreID),
zap.Uint64("clusterID", clusterID))
if err := cfg.newStorageCheckpointMetaManagerPITR(ctx, clusterID, deletedRestoreID); err != nil {
log.Warn("failed to initialize storage checkpoint meta managers for cleanup", zap.Error(err))
}
} else {
log.Info("initializing table checkpoint meta managers for cleanup",
zap.Uint64("restoreID", deletedRestoreID))
if err := cfg.newTableCheckpointMetaManagerPITR(g, mgr.GetDomain(), deletedRestoreID); err != nil {
log.Warn("failed to initialize table checkpoint meta managers for cleanup", zap.Error(err))
}
}
// clean up checkpoint data
cleanUpCheckpoints(ctx, cfg)
log.Info("successfully aborted restore task and cleaned up checkpoint data. "+
"Use drop statements to clean up the restored data from the cluster if you want to.",
zap.Uint64("restoreId", deletedRestoreID))
return nil
}