383 lines
11 KiB
Go
383 lines
11 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package backup
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/logutil"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
|
|
tidbutil "github.com/pingcap/tidb/pkg/util"
|
|
pd "github.com/tikv/pd/client"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type BackupRetryPolicy struct {
|
|
One uint64
|
|
All bool
|
|
}
|
|
|
|
type BackupSender interface {
|
|
SendAsync(
|
|
ctx context.Context,
|
|
round uint64,
|
|
storeID uint64,
|
|
limiter *ResourceConcurrentLimiter,
|
|
request backuppb.BackupRequest,
|
|
concurrency uint,
|
|
cli backuppb.BackupClient,
|
|
respCh chan *ResponseAndStore,
|
|
StateNotifier chan BackupRetryPolicy)
|
|
}
|
|
|
|
type ResponseAndStore struct {
|
|
Resp *backuppb.BackupResponse
|
|
StoreID uint64
|
|
}
|
|
|
|
func (r ResponseAndStore) GetResponse() *backuppb.BackupResponse {
|
|
return r.Resp
|
|
}
|
|
|
|
func (r ResponseAndStore) GetStoreID() uint64 {
|
|
return r.StoreID
|
|
}
|
|
|
|
// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
|
|
type timeoutRecv struct {
|
|
storeID uint64
|
|
wg sync.WaitGroup
|
|
parentCtx context.Context
|
|
cancel context.CancelCauseFunc
|
|
|
|
refresh chan struct{}
|
|
}
|
|
|
|
// Refresh the timeout ticker
|
|
func (trecv *timeoutRecv) Refresh() {
|
|
select {
|
|
case <-trecv.parentCtx.Done():
|
|
case trecv.refresh <- struct{}{}:
|
|
}
|
|
}
|
|
|
|
// Stop the timeout ticker
|
|
func (trecv *timeoutRecv) Stop() {
|
|
close(trecv.refresh)
|
|
trecv.wg.Wait()
|
|
trecv.cancel(nil)
|
|
}
|
|
|
|
var TimeoutOneResponse = time.Hour
|
|
|
|
func (trecv *timeoutRecv) loop(timeout time.Duration) {
|
|
defer trecv.wg.Done()
|
|
ticker := time.NewTicker(timeout)
|
|
defer ticker.Stop()
|
|
for {
|
|
ticker.Reset(timeout)
|
|
select {
|
|
case <-trecv.parentCtx.Done():
|
|
return
|
|
case _, ok := <-trecv.refresh:
|
|
if !ok {
|
|
return
|
|
}
|
|
case <-ticker.C:
|
|
log.Warn("wait backup response timeout, cancel the backup",
|
|
zap.Duration("timeout", timeout), zap.Uint64("storeID", trecv.storeID))
|
|
trecv.cancel(errors.Errorf("receive a backup response timeout"))
|
|
}
|
|
}
|
|
}
|
|
|
|
func StartTimeoutRecv(ctx context.Context, timeout time.Duration, storeID uint64) (context.Context, *timeoutRecv) {
|
|
cctx, cancel := context.WithCancelCause(ctx)
|
|
trecv := &timeoutRecv{
|
|
storeID: storeID,
|
|
parentCtx: ctx,
|
|
cancel: cancel,
|
|
refresh: make(chan struct{}),
|
|
}
|
|
trecv.wg.Add(1)
|
|
go trecv.loop(timeout)
|
|
return cctx, trecv
|
|
}
|
|
|
|
func doSendBackup(
|
|
ctx context.Context,
|
|
client backuppb.BackupClient,
|
|
limiter *ResourceConcurrentLimiter,
|
|
req backuppb.BackupRequest,
|
|
respFn func(*backuppb.BackupResponse) error,
|
|
) error {
|
|
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
|
|
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
|
|
"process will notify the shell.")
|
|
if sigFile, ok := v.(string); ok {
|
|
file, err := os.Create(sigFile)
|
|
if err != nil {
|
|
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
|
|
}
|
|
if file != nil {
|
|
file.Close()
|
|
}
|
|
}
|
|
time.Sleep(3 * time.Second)
|
|
})
|
|
reqStartKey, reqEndKey := req.StartKey, req.EndKey
|
|
// Note: BR can set ranges into req.StartKey/req.EndKey, req.SubRanges or req.SortedSubRangesGroups.
|
|
// TODO: reqRangeSize += len(req.SortedSubRangesGroups) if the feature merged SST files is implemented.
|
|
reqRangeSize := len(req.SubRanges) + 1
|
|
limiter.Acquire(reqRangeSize)
|
|
bCli, err := client.Backup(ctx, &req)
|
|
limiter.Release(reqRangeSize)
|
|
// Note: derefer req here to let req.SubRanges be released as soon as possible.
|
|
// That's because in the backup main loop, the sub ranges is generated every about 15 seconds,
|
|
// which may accumulate a large number of incomplete ranges.
|
|
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
|
|
switch val.(string) {
|
|
case "Unavailable":
|
|
{
|
|
logutil.CL(ctx).Debug("failpoint reset-retryable-error unavailable injected.")
|
|
err = status.Error(codes.Unavailable, "Unavailable error")
|
|
}
|
|
case "Internal":
|
|
{
|
|
logutil.CL(ctx).Debug("failpoint reset-retryable-error internal injected.")
|
|
err = status.Error(codes.Internal, "Internal error")
|
|
}
|
|
}
|
|
})
|
|
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
|
|
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
|
|
}
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
_ = bCli.CloseSend()
|
|
}()
|
|
|
|
for {
|
|
resp, err := bCli.Recv()
|
|
if err != nil {
|
|
if errors.Cause(err) == io.EOF { // nolint:errorlint
|
|
logutil.CL(ctx).Debug("backup streaming finish",
|
|
logutil.Key("backup-start-key", reqStartKey),
|
|
logutil.Key("backup-end-key", reqEndKey))
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
// TODO: handle errors in the resp.
|
|
logutil.CL(ctx).Debug("range backed up",
|
|
logutil.Key("small-range-start-key", resp.GetStartKey()),
|
|
logutil.Key("small-range-end-key", resp.GetEndKey()),
|
|
zap.Int("api-version", int(resp.ApiVersion)))
|
|
err = respFn(resp)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func startBackup(
|
|
pctx context.Context,
|
|
storeID uint64,
|
|
limiter *ResourceConcurrentLimiter,
|
|
backupReq backuppb.BackupRequest,
|
|
backupCli backuppb.BackupClient,
|
|
concurrency uint,
|
|
respCh chan *ResponseAndStore,
|
|
) error {
|
|
// this goroutine handle the response from a single store
|
|
select {
|
|
case <-pctx.Done():
|
|
return pctx.Err()
|
|
default:
|
|
// Send backup request to the store.
|
|
// handle the backup response or internal error here.
|
|
// handle the store error(reboot or network partition) outside.
|
|
reqs := SplitBackupReqRanges(backupReq, int(concurrency))
|
|
logutil.CL(pctx).Info("starting backup to the corresponding store", zap.Uint64("storeID", storeID),
|
|
zap.Int("requestCount", len(reqs)), zap.Uint("concurrency", concurrency))
|
|
|
|
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
|
|
// terminate the backup if it does not receive any new response for a long time.
|
|
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse, storeID)
|
|
defer timerecv.Stop()
|
|
|
|
pool := tidbutil.NewWorkerPool(concurrency, "store_backup")
|
|
eg, ectx := errgroup.WithContext(ctx)
|
|
for i, req := range reqs {
|
|
bkReq := req
|
|
reqIndex := i
|
|
pool.ApplyOnErrorGroup(eg, func() error {
|
|
retry := -1
|
|
return utils.WithRetry(ectx, func() error {
|
|
retry += 1
|
|
if retry > 1 {
|
|
logutil.CL(ectx).Info("retry backup to store", zap.Uint64("storeID", storeID),
|
|
zap.Int("retry", retry), zap.Int("reqIndex", reqIndex))
|
|
}
|
|
return doSendBackup(ectx, backupCli, limiter, bkReq, func(resp *backuppb.BackupResponse) error {
|
|
// Forward all responses (including error).
|
|
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
|
|
msg := val.(string)
|
|
logutil.CL(ectx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg))
|
|
resp.Error = &backuppb.Error{
|
|
Msg: msg,
|
|
}
|
|
})
|
|
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
|
|
msg := val.(string)
|
|
logutil.CL(ectx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
|
|
resp.Error = &backuppb.Error{
|
|
Msg: msg,
|
|
}
|
|
})
|
|
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
|
|
msg := val.(string)
|
|
logutil.CL(ectx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
|
|
resp.Error = &backuppb.Error{
|
|
Msg: msg,
|
|
}
|
|
})
|
|
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
|
|
msg := val.(string)
|
|
logutil.CL(ectx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
|
|
resp.Error = &backuppb.Error{
|
|
// Msg: msg,
|
|
Detail: &backuppb.Error_RegionError{
|
|
RegionError: &errorpb.Error{
|
|
Message: msg,
|
|
},
|
|
},
|
|
}
|
|
})
|
|
select {
|
|
case <-ectx.Done():
|
|
return ectx.Err()
|
|
case respCh <- &ResponseAndStore{
|
|
Resp: resp,
|
|
StoreID: storeID,
|
|
}:
|
|
// reset timeout when receive a response
|
|
timerecv.Refresh()
|
|
}
|
|
return nil
|
|
})
|
|
}, utils.NewBackupSSTBackoffStrategy())
|
|
})
|
|
}
|
|
return eg.Wait()
|
|
}
|
|
}
|
|
|
|
func ObserveStoreChangesAsync(ctx context.Context, stateNotifier chan BackupRetryPolicy, pdCli pd.Client) {
|
|
go func() {
|
|
sendAll := false
|
|
newJoinStoresMap := make(map[uint64]struct{})
|
|
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
|
|
sendAll = true
|
|
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
|
|
sendAll = true
|
|
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
|
|
// only backup for this store
|
|
newJoinStoresMap[s.Id] = struct{}{}
|
|
}))
|
|
|
|
notifyFn := func(ctx context.Context, sendPolicy BackupRetryPolicy) {
|
|
select {
|
|
case <-ctx.Done():
|
|
case stateNotifier <- sendPolicy:
|
|
}
|
|
}
|
|
|
|
watcher := storewatch.New(pdCli, cb)
|
|
// make a first step, and make the state correct for next 30s check
|
|
err := watcher.Step(ctx)
|
|
if err != nil {
|
|
logutil.CL(ctx).Warn("failed to watch store changes at beginning, ignore it", zap.Error(err))
|
|
}
|
|
tickInterval := 30 * time.Second
|
|
failpoint.Inject("backup-store-change-tick", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
tickInterval = 100 * time.Millisecond
|
|
}
|
|
logutil.CL(ctx).Info("failpoint backup-store-change-tick injected.", zap.Duration("interval", tickInterval))
|
|
})
|
|
tick := time.NewTicker(tickInterval)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
// reset the state
|
|
sendAll = false
|
|
clear(newJoinStoresMap)
|
|
logutil.CL(ctx).Info("check store changes every 30s")
|
|
err := watcher.Step(ctx)
|
|
if err != nil {
|
|
logutil.CL(ctx).Warn("failed to watch store changes, ignore it", zap.Error(err))
|
|
}
|
|
if sendAll {
|
|
logutil.CL(ctx).Info("detect some store(s) restarted or disconnected, notify with all stores")
|
|
notifyFn(ctx, BackupRetryPolicy{All: true})
|
|
} else if len(newJoinStoresMap) > 0 {
|
|
for storeID := range newJoinStoresMap {
|
|
logutil.CL(ctx).Info("detect a new registered store, notify with this store", zap.Uint64("storeID", storeID))
|
|
notifyFn(ctx, BackupRetryPolicy{One: storeID})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func SplitBackupReqRanges(req backuppb.BackupRequest, count int) []backuppb.BackupRequest {
|
|
rangeCount := len(req.SubRanges)
|
|
if rangeCount == 0 {
|
|
return []backuppb.BackupRequest{req}
|
|
}
|
|
splitRequests := make([]backuppb.BackupRequest, 0, count)
|
|
if count <= 1 {
|
|
// 0/1 means no need to split, just send one batch request
|
|
return []backuppb.BackupRequest{req}
|
|
}
|
|
splitStep := rangeCount / count
|
|
overCount := rangeCount - count*splitStep
|
|
start := 0
|
|
for i := range count {
|
|
nextStart := start + splitStep
|
|
if i < overCount {
|
|
nextStart += 1
|
|
} else if nextStart == start {
|
|
break
|
|
}
|
|
splitReq := req
|
|
splitReq.SubRanges = req.SubRanges[start:nextStart]
|
|
splitRequests = append(splitRequests, splitReq)
|
|
start = nextStart
|
|
}
|
|
return splitRequests
|
|
}
|