Files
tidb/br/pkg/task/backup.go

857 lines
28 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package task
import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/docker/go-units"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/checkpoint"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/types"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
const (
flagBackupTimeago = "timeago"
flagBackupTS = "backupts"
flagLastBackupTS = "lastbackupts"
flagCompressionType = "compression"
flagCompressionLevel = "compression-level"
flagRemoveSchedulers = "remove-schedulers"
flagRangeLimit = "range-limit"
flagIgnoreStats = "ignore-stats"
flagUseBackupMetaV2 = "use-backupmeta-v2"
flagUseCheckpoint = "use-checkpoint"
flagKeyspaceName = "keyspace-name"
flagReplicaReadLabel = "replica-read-label"
flagTableConcurrency = "table-concurrency"
flagGCTTL = "gcttl"
defaultBackupConcurrency = 4
maxBackupConcurrency = 256
)
const (
FullBackupCmd = "Full Backup"
DBBackupCmd = "Database Backup"
TableBackupCmd = "Table Backup"
RawBackupCmd = "Raw Backup"
TxnBackupCmd = "Txn Backup"
)
// CompressionConfig is the configuration for sst file compression.
type CompressionConfig struct {
CompressionType backuppb.CompressionType `json:"compression-type" toml:"compression-type"`
CompressionLevel int32 `json:"compression-level" toml:"compression-level"`
}
// BackupConfig is the configuration specific for backup tasks.
type BackupConfig struct {
Config
TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
RangeLimit int `json:"range-limit" toml:"range-limit"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
UseCheckpoint bool `json:"use-checkpoint" toml:"use-checkpoint"`
ReplicaReadLabel map[string]string `json:"replica-read-label" toml:"replica-read-label"`
TableConcurrency uint `json:"table-concurrency" toml:"table-concurrency"`
CompressionConfig
// for ebs-based backup
FullBackupType FullBackupType `json:"full-backup-type" toml:"full-backup-type"`
VolumeFile string `json:"volume-file" toml:"volume-file"`
SkipAWS bool `json:"skip-aws" toml:"skip-aws"`
CloudAPIConcurrency uint `json:"cloud-api-concurrency" toml:"cloud-api-concurrency"`
ProgressFile string `json:"progress-file" toml:"progress-file"`
SkipPauseGCAndScheduler bool `json:"skip-pause-gc-and-scheduler" toml:"skip-pause-gc-and-scheduler"`
}
// DefineBackupFlags defines common flags for the backup command.
func DefineBackupFlags(flags *pflag.FlagSet) {
flags.Duration(
flagBackupTimeago, 0,
"The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint")
// TODO: remove experimental tag if it's stable
flags.Uint64(flagLastBackupTS, 0, "(experimental) the last time backup ts,"+
" use for incremental backup, support TSO only")
flags.String(flagBackupTS, "", "the backup ts support TSO or datetime,"+
" e.g. '400036290571534337', '2018-05-11 01:42:23'")
flags.Int64(flagGCTTL, utils.DefaultBRGCSafePointTTL, "the TTL (in seconds) that PD holds for BR's GC safepoint")
flags.String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
flags.Int32(flagCompressionLevel, 0, "compression level used for sst file compression")
flags.Uint32(flagConcurrency, 4,
"Controls how many backup requests are sent out in parallel to one TiKV node. "+
"This doesn't directly impact performance — keeping the default is fine in most cases. "+
"Change TiKV's 'backup.num-threads' to adjust actual backup throughput.")
flags.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+
"including tableInfo/checksum and stats.")
flags.Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
// This flag can impact the online cluster, so hide it in case of abuse.
_ = flags.MarkHidden(flagRemoveSchedulers)
flags.Int(flagRangeLimit, backup.RangesSentThreshold, "limits the number of ranges marshaled at the same time when sent to many TiKVs.")
// Disable stats by default.
// TODO: we need a better way to backup/restore stats.
flags.Bool(flagIgnoreStats, true, "ignore backup stats")
flags.Bool(flagUseBackupMetaV2, true,
"use backup meta v2 to store meta info")
flags.String(flagKeyspaceName, "", "keyspace name for backup")
// This flag will change the structure of backupmeta.
// we must make sure the old three version of br can parse the v2 meta to keep compatibility.
// so this flag should set to false for three version by default.
// for example:
// if we put this feature in v4.0.14, then v4.0.14 br can parse v2 meta
// but will generate v1 meta due to this flag is false. the behaviour is as same as v4.0.15, v4.0.16.
// finally v4.0.17 will set this flag to true, and generate v2 meta.
//
// the version currently is v7.4.0, the flag can be set to true as default value.
// _ = flags.MarkHidden(flagUseBackupMetaV2)
flags.Bool(flagUseCheckpoint, true, "use checkpoint mode")
_ = flags.MarkHidden(flagUseCheckpoint)
flags.String(flagReplicaReadLabel, "", "specify the label of the stores to be used for backup, e.g. 'label_key:label_value'")
}
// ParseFromFlags parses the backup-related flags from the flag set.
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig bool) error {
timeAgo, err := flags.GetDuration(flagBackupTimeago)
if err != nil {
return errors.Trace(err)
}
if timeAgo < 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "negative timeago is not allowed")
}
cfg.TimeAgo = timeAgo
cfg.LastBackupTS, err = flags.GetUint64(flagLastBackupTS)
if err != nil {
return errors.Trace(err)
}
backupTS, err := flags.GetString(flagBackupTS)
if err != nil {
return errors.Trace(err)
}
cfg.BackupTS, err = ParseTSString(backupTS, false)
if err != nil {
return errors.Trace(err)
}
cfg.UseBackupMetaV2, err = flags.GetBool(flagUseBackupMetaV2)
if err != nil {
return errors.Trace(err)
}
cfg.UseCheckpoint, err = flags.GetBool(flagUseCheckpoint)
if err != nil {
return errors.Trace(err)
}
if cfg.LastBackupTS > 0 {
// TODO: compatible with incremental backup
cfg.UseCheckpoint = false
log.Info("since incremental backup is used, turn off checkpoint mode")
}
gcTTL, err := flags.GetInt64(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL
cfg.Concurrency, err = flags.GetUint32(flagConcurrency)
if err != nil {
return errors.Trace(err)
}
if cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency); err != nil {
return errors.Trace(err)
}
compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionConfig = *compressionCfg
// parse common flags if needed
if !skipCommonConfig {
if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
}
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
cfg.RangeLimit, err = flags.GetInt(flagRangeLimit)
if err != nil {
return errors.Trace(err)
}
if cfg.RangeLimit <= 0 {
return errors.Errorf("the parameter `--range-limit` should be larger than 0")
}
cfg.IgnoreStats, err = flags.GetBool(flagIgnoreStats)
if err != nil {
return errors.Trace(err)
}
cfg.KeyspaceName, err = flags.GetString(flagKeyspaceName)
if err != nil {
return errors.Trace(err)
}
if flags.Lookup(flagFullBackupType) != nil {
// for backup full
fullBackupType, err := flags.GetString(flagFullBackupType)
if err != nil {
return errors.Trace(err)
}
if !FullBackupType(fullBackupType).Valid() {
return errors.New("invalid full backup type")
}
cfg.FullBackupType = FullBackupType(fullBackupType)
cfg.SkipAWS, err = flags.GetBool(flagSkipAWS)
if err != nil {
return errors.Trace(err)
}
cfg.CloudAPIConcurrency, err = flags.GetUint(flagCloudAPIConcurrency)
if err != nil {
return errors.Trace(err)
}
cfg.VolumeFile, err = flags.GetString(flagBackupVolumeFile)
if err != nil {
return errors.Trace(err)
}
cfg.ProgressFile, err = flags.GetString(flagProgressFile)
if err != nil {
return errors.Trace(err)
}
cfg.SkipPauseGCAndScheduler, err = flags.GetBool(flagOperatorPausedGCAndSchedulers)
if err != nil {
return errors.Trace(err)
}
}
cfg.ReplicaReadLabel, err = parseReplicaReadLabelFlag(flags)
if err != nil {
return errors.Trace(err)
}
return nil
}
// parseCompressionFlags parses the backup-related flags from the flag set.
func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return nil, errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return nil, errors.Trace(err)
}
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return nil, errors.Trace(err)
}
return &CompressionConfig{
CompressionLevel: level,
CompressionType: compressionType,
}, nil
}
// Adjust is use for BR(binary) and BR in TiDB.
// When new config was add and not included in parser.
// we should set proper value in this function.
// so that both binary and TiDB will use same default value.
func (cfg *BackupConfig) Adjust() {
cfg.adjust()
usingDefaultConcurrency := false
if cfg.Config.Concurrency == 0 {
cfg.Config.Concurrency = defaultBackupConcurrency
usingDefaultConcurrency = true
}
if cfg.Config.Concurrency > maxBackupConcurrency {
cfg.Config.Concurrency = maxBackupConcurrency
}
if cfg.RateLimit != unlimited {
// TiKV limits the upload rate by each backup request.
// When the backup requests are sent concurrently,
// the ratelimit couldn't work as intended.
// Degenerating to sequentially sending backup requests to avoid this.
if !usingDefaultConcurrency {
logutil.WarnTerm("setting `--ratelimit` and `--concurrency` at the same time, "+
"ignoring `--concurrency`: `--ratelimit` forces sequential (i.e. concurrency = 1) backup",
zap.String("ratelimit", units.HumanSize(float64(cfg.RateLimit))+"/s"),
zap.Uint32("concurrency-specified", cfg.Config.Concurrency))
}
cfg.Config.Concurrency = 1
}
if cfg.GCTTL == 0 {
cfg.GCTTL = utils.DefaultBRGCSafePointTTL
}
// Use zstd as default
if cfg.CompressionType == backuppb.CompressionType_UNKNOWN {
cfg.CompressionType = backuppb.CompressionType_ZSTD
}
if cfg.CloudAPIConcurrency == 0 {
cfg.CloudAPIConcurrency = defaultCloudAPIConcurrency
}
}
type immutableBackupConfig struct {
LastBackupTS uint64 `json:"last-backup-ts"`
IgnoreStats bool `json:"ignore-stats"`
UseCheckpoint bool `json:"use-checkpoint"`
objstore.BackendOptions
Storage string `json:"storage"`
PD []string `json:"pd"`
SendCreds bool `json:"send-credentials-to-tikv"`
NoCreds bool `json:"no-credentials"`
FilterStr []string `json:"filter-strings"`
CipherInfo backuppb.CipherInfo `json:"cipher"`
KeyspaceName string `json:"keyspace-name"`
}
// a rough hash for checkpoint checker
func (cfg *BackupConfig) Hash() ([]byte, error) {
config := &immutableBackupConfig{
LastBackupTS: cfg.LastBackupTS,
IgnoreStats: cfg.IgnoreStats,
UseCheckpoint: cfg.UseCheckpoint,
BackendOptions: cfg.BackendOptions,
Storage: cfg.Storage,
PD: cfg.PD,
SendCreds: cfg.SendCreds,
NoCreds: cfg.NoCreds,
FilterStr: cfg.FilterStr,
CipherInfo: cfg.CipherInfo,
KeyspaceName: cfg.KeyspaceName,
}
data, err := json.Marshal(config)
if err != nil {
return nil, errors.Trace(err)
}
hash := sha256.Sum256(data)
return hash[:], nil
}
func isFullBackup(cmdName string) bool {
return cmdName == FullBackupCmd
}
// RunBackup starts a backup task inside the current goroutine.
func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig) error {
cfg.Adjust()
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = cfg.KeyspaceName
})
defer summary.Summary(cmdName)
ctx, cancel := context.WithCancel(c)
defer cancel()
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("task.RunBackup", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
isIncrementalBackup := cfg.LastBackupTS > 0
skipChecksum := !cfg.Checksum || isIncrementalBackup
u, err := objstore.ParseBackend(cfg.Storage, &cfg.BackendOptions)
if err != nil {
return errors.Trace(err)
}
// if use noop as external storage, turn off the checkpoint mode
if u.GetNoop() != nil {
log.Info("since noop external storage is used, turn off checkpoint mode")
cfg.UseCheckpoint = false
}
skipStats := cfg.IgnoreStats
// For backup, Domain is not needed if user ignores stats.
// Domain loads all table info into memory. By skipping Domain, we save
// lots of memory (about 500MB for 40K 40 fields YCSB tables).
needDomain := !skipStats
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
defer mgr.Close()
// after version check, check the cluster whether support checkpoint mode
if cfg.UseCheckpoint {
err = version.CheckCheckpointSupport()
if err != nil {
log.Warn("unable to use checkpoint mode, fall back to normal mode", zap.Error(err))
cfg.UseCheckpoint = false
}
}
var statsHandle *handle.Handle
if !skipStats {
statsHandle = mgr.GetDomain().StatsHandle()
}
var newCollationEnable string
err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error {
newCollationEnable, err = se.GetGlobalVariable(utils.GetTidbNewCollationEnabled())
if err != nil {
return errors.Trace(err)
}
log.Info(fmt.Sprintf("get %s config from mysql.tidb table", utils.TidbNewCollationEnabled),
zap.String(utils.GetTidbNewCollationEnabled(), newCollationEnable))
return nil
})
if err != nil {
return errors.Trace(err)
}
client := backup.NewTableBackupClient(ctx, mgr)
// set cipher only for checkpoint
client.SetCipher(&cfg.CipherInfo)
// set skip checksum status
client.SetSkipChecksum(skipChecksum)
opts := storeapi.Options{
NoCredentials: cfg.NoCreds,
SendCredentials: cfg.SendCreds,
CheckS3ObjectLockOptions: true,
}
if err = client.SetStorageAndCheckNotInUse(ctx, u, &opts); err != nil {
return errors.Trace(err)
}
// if checkpoint mode is unused at this time but there is checkpoint meta,
// CheckCheckpoint will stop backing up
cfgHash, err := cfg.Hash()
if err != nil {
return errors.Trace(err)
}
err = client.CheckCheckpoint(cfgHash)
if err != nil {
return errors.Trace(err)
}
err = client.SetLockFile(ctx)
if err != nil {
return errors.Trace(err)
}
// if use checkpoint and gcTTL is the default value
// update gcttl to checkpoint's default gc ttl
if cfg.UseCheckpoint && cfg.GCTTL == utils.DefaultBRGCSafePointTTL {
cfg.GCTTL = utils.DefaultCheckpointGCSafePointTTL
log.Info("use checkpoint's default GC TTL", zap.Int64("GC TTL", cfg.GCTTL))
}
client.SetGCTTL(cfg.GCTTL)
backupTS, err := client.GetTS(ctx, cfg.TimeAgo, cfg.BackupTS)
if err != nil {
return errors.Trace(err)
}
g.Record("BackupTS", backupTS)
safePointID := client.GetSafePointID()
sp := utils.BRServiceSafePoint{
BackupTS: backupTS,
TTL: client.GetGCTTL(),
ID: safePointID,
}
// use lastBackupTS as safePoint if exists
if isIncrementalBackup {
sp.BackupTS = cfg.LastBackupTS
}
log.Info("current backup safePoint job", zap.Object("safePoint", sp))
cctx, gcSafePointKeeperCancel := context.WithCancel(ctx)
gcSafePointKeeperRemovable := false
defer func() {
// don't reset the gc-safe-point if checkpoint mode is used and backup is not finished
if cfg.UseCheckpoint && !gcSafePointKeeperRemovable {
log.Info("skip removing gc-safepoint keeper for next retry", zap.String("gc-id", sp.ID))
return
}
log.Info("start to remove gc-safepoint keeper")
// close the gc safe point keeper at first
gcSafePointKeeperCancel()
// set the ttl to 0 to remove the gc-safe-point
sp.TTL = 0
if err := utils.UpdateServiceSafePoint(ctx, mgr.GetPDClient(), sp); err != nil {
log.Warn("failed to update service safe point, backup may fail if gc triggered",
zap.Error(err),
)
}
log.Info("finish removing gc-safepoint keeper")
}()
err = utils.StartServiceSafePointKeeper(cctx, mgr.GetPDClient(), sp)
if err != nil {
return errors.Trace(err)
}
if cfg.RemoveSchedulers {
log.Debug("removing some PD schedulers")
restore, e := mgr.RemoveSchedulers(ctx)
defer func() {
if ctx.Err() != nil {
log.Warn("context canceled, doing clean work with background context")
ctx = context.Background()
}
if restoreE := restore(ctx); restoreE != nil {
log.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}
}()
if e != nil {
return errors.Trace(err)
}
}
req := backuppb.BackupRequest{
ClusterId: client.GetClusterID(),
StartVersion: cfg.LastBackupTS,
EndVersion: backupTS,
RateLimit: cfg.RateLimit,
StorageBackend: client.GetStorageBackend(),
Concurrency: defaultBackupConcurrency,
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
CipherInfo: &cfg.CipherInfo,
ReplicaRead: len(cfg.ReplicaReadLabel) != 0,
Context: &kvrpcpb.Context{
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: "", // TODO,
},
RequestSource: kvutil.BuildRequestSource(true, kv.InternalTxnBR, kvutil.ExplicitTypeBR),
},
}
brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
ranges, schemas, policies, err := client.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS, isFullBackup(cmdName))
if err != nil {
return errors.Trace(err)
}
// Metafile size should be less than 64MB.
metawriter := metautil.NewMetaWriter(client.GetStorage(),
metautil.MetaFileSize, cfg.UseBackupMetaV2, "", &cfg.CipherInfo)
// Hack way to update backupmeta.
metawriter.Update(func(m *backuppb.BackupMeta) {
m.StartVersion = req.StartVersion
m.EndVersion = req.EndVersion
m.IsRawKv = req.IsRawKv
m.ClusterId = req.ClusterId
m.ClusterVersion = clusterVersion
m.BrVersion = brVersion
m.NewCollationsEnabled = newCollationEnable
m.ApiVersion = mgr.GetStorage().GetCodec().GetAPIVersion()
})
log.Info("get placement policies", zap.Int("count", len(policies)))
if len(policies) != 0 {
metawriter.Update(func(m *backuppb.BackupMeta) {
m.Policies = policies
})
}
// check on ranges and schemas and if nothing to back up do early return
if len(ranges) == 0 && (schemas == nil || schemas.Len() == 0) {
pdAddress := strings.Join(cfg.PD, ",")
log.Warn("Nothing to backup, maybe connected to cluster for restoring",
zap.String("PD address", pdAddress))
err = metawriter.FlushBackupMeta(ctx)
if err == nil {
summary.SetSuccessStatus(true)
}
return err
}
if isIncrementalBackup {
if backupTS <= cfg.LastBackupTS {
log.Error("LastBackupTS is larger or equal to current TS")
return errors.Annotate(berrors.ErrInvalidArgument, "LastBackupTS is larger or equal to current TS")
}
err = utils.CheckGCSafePoint(ctx, mgr.GetPDClient(), cfg.LastBackupTS)
if err != nil {
log.Error("Check gc safepoint for last backup ts failed", zap.Error(err))
return errors.Trace(err)
}
metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS, needDomain)
if err != nil {
return errors.Trace(err)
}
if err = metawriter.FinishWriteMetas(ctx, metautil.AppendDDL); err != nil {
return errors.Trace(err)
}
}
summary.CollectInt("backup total ranges", len(ranges))
progressTotalCount, progressUnit, err := getProgressCountOfRanges(ctx, mgr, ranges)
if err != nil {
return errors.Trace(err)
}
// Redirect to log if there is no log file to avoid unreadable output.
updateCh := g.StartProgress(
ctx, cmdName, int64(progressTotalCount), !cfg.LogProgress)
progressCount := uint64(0)
progressCallBack := func(callBackUnit backup.ProgressUnit) {
if progressUnit == callBackUnit {
updateCh.Inc()
failpoint.Inject("progress-call-back", func(v failpoint.Value) {
log.Info("failpoint progress-call-back injected")
atomic.AddUint64(&progressCount, 1)
if fileName, ok := v.(string); ok {
f, osErr := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if osErr != nil {
log.Warn("failed to create file", zap.Error(osErr))
}
msg := fmt.Appendf(nil, "%s:%d\n", progressUnit, atomic.LoadUint64(&progressCount))
_, err = f.Write(msg)
if err != nil {
log.Warn("failed to write data to file", zap.Error(err))
}
}
})
}
}
if cfg.UseCheckpoint {
if err = client.StartCheckpointRunner(ctx, cfgHash, backupTS, safePointID, progressCallBack); err != nil {
return errors.Trace(err)
}
defer func() {
if !gcSafePointKeeperRemovable {
log.Info("wait for flush checkpoint...")
client.WaitForFinishCheckpoint(ctx, true)
} else {
log.Info("start to remove checkpoint data for backup")
client.WaitForFinishCheckpoint(ctx, false)
if removeErr := checkpoint.RemoveCheckpointDataForBackup(ctx, client.GetStorage()); removeErr != nil {
log.Warn("failed to remove checkpoint data for backup", zap.Error(removeErr))
} else {
log.Info("the checkpoint data for backup is removed.")
}
}
}()
}
failpoint.Inject("s3-outage-during-writing-file", func(v failpoint.Value) {
log.Info("failpoint s3-outage-during-writing-file injected, " +
"process will sleep for 5s and notify the shell to kill s3 service.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(5 * time.Second)
})
metawriter.StartWriteMetasAsync(ctx, metautil.AppendDataFile)
checksumMap, err := client.BackupRanges(ctx, ranges, req, uint(cfg.Concurrency), cfg.RangeLimit, cfg.ReplicaReadLabel, metawriter, progressCallBack)
if err != nil {
return errors.Trace(err)
}
// Backup has finished
updateCh.Close()
err = metawriter.FinishWriteMetas(ctx, metautil.AppendDataFile)
if err != nil {
return errors.Trace(err)
}
var checksumProgress int64 = 0
// if checksumMap is not empty, then checksumProgress will be set to len(schemas)
if len(checksumMap) > 0 {
checksumProgress = int64(schemas.Len())
}
if skipChecksum {
if isIncrementalBackup {
// Since we don't support checksum for incremental data, fast checksum should be skipped.
log.Info("Skip fast checksum in incremental backup")
} else {
// When user specified not to calculate checksum, don't calculate checksum.
log.Info("Skip fast checksum")
}
}
updateCh = g.StartProgress(ctx, "Checksum", checksumProgress, !cfg.LogProgress)
if schemas != nil && schemas.Len() > 0 {
schemasConcurrency := min(cfg.TableConcurrency, uint(schemas.Len()))
err = schemas.BackupSchemas(
ctx, metawriter, client.GetCheckpointRunner(), mgr.GetStorage(), statsHandle, backupTS, checksumMap, schemasConcurrency, cfg.ChecksumConcurrency, skipChecksum, updateCh)
if err != nil {
return errors.Trace(err)
}
}
err = metawriter.FlushBackupMeta(ctx)
if err != nil {
return errors.Trace(err)
}
// Since backupmeta is flushed on the external storage,
// we can remove the gc safepoint keeper
gcSafePointKeeperRemovable = true
// Checksum has finished, close checksum progress.
updateCh.Close()
archiveSize := metawriter.ArchiveSize()
g.Record(summary.BackupDataSize, archiveSize)
//backup from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
g.Record("Size", archiveSize)
// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
}
func getProgressCountOfRanges(
ctx context.Context,
mgr *conn.Mgr,
ranges []rtree.KeyRange,
) (int, backup.ProgressUnit, error) {
if len(ranges) > 1000 {
return len(ranges), backup.UnitRange, nil
}
failpoint.Inject("progress-call-back", func(_ failpoint.Value) {
if len(ranges) > 100 {
failpoint.Return(len(ranges), backup.UnitRange, nil)
}
})
// The number of regions need to backup
approximateRegions := 0
for _, r := range ranges {
regionCount, err := mgr.GetRegionCount(ctx, r.StartKey, r.EndKey)
if err != nil {
return 0, backup.UnitRegion, errors.Trace(err)
}
approximateRegions += regionCount
}
summary.CollectInt("backup total regions", approximateRegions)
return approximateRegions, backup.UnitRegion, nil
}
// ParseTSString port from tidb setSnapshotTS.
func ParseTSString(ts string, tzCheck bool) (uint64, error) {
if len(ts) == 0 {
return 0, nil
}
if tso, err := strconv.ParseUint(ts, 10, 64); err == nil {
return tso, nil
}
loc := time.Local
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
if tzCheck {
tzIdx, _, _, _, _ := types.GetTimezone(ts)
if tzIdx < 0 {
return 0, errors.Errorf("must set timezone when using datetime format ts, e.g. '2018-05-11 01:42:23+0800'")
}
}
t, err := types.ParseTime(sc.TypeCtx(), ts, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return 0, errors.Trace(err)
}
t1, err := t.GoTime(loc)
if err != nil {
return 0, errors.Trace(err)
}
return oracle.GoTimeToTS(t1), nil
}
func DefaultBackupConfig(commonConfig Config) BackupConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineBackupFlags(fs)
cfg := BackupConfig{}
err := cfg.ParseFromFlags(fs, true)
if err != nil {
log.Panic("failed to parse backup flags to config", zap.Error(err))
}
cfg.Config = commonConfig
return cfg
}
func parseCompressionType(s string) (backuppb.CompressionType, error) {
var ct backuppb.CompressionType
switch s {
case "lz4":
ct = backuppb.CompressionType_LZ4
case "snappy":
ct = backuppb.CompressionType_SNAPPY
case "zstd":
ct = backuppb.CompressionType_ZSTD
default:
return backuppb.CompressionType_UNKNOWN, errors.Annotatef(berrors.ErrInvalidArgument, "invalid compression type '%s'", s)
}
return ct, nil
}
func parseReplicaReadLabelFlag(flags *pflag.FlagSet) (map[string]string, error) {
replicaReadLabelStr, err := flags.GetString(flagReplicaReadLabel)
if err != nil {
return nil, errors.Trace(err)
}
if replicaReadLabelStr == "" {
return nil, nil
}
kv := strings.Split(replicaReadLabelStr, ":")
if len(kv) != 2 {
return nil, errors.Annotatef(berrors.ErrInvalidArgument, "invalid replica read label '%s'", replicaReadLabelStr)
}
return map[string]string{kv[0]: kv[1]}, nil
}