506 lines
17 KiB
Go
506 lines
17 KiB
Go
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
|
|
package data
|
|
|
|
import (
|
|
"context"
|
|
stdErr "errors"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
recovpb "github.com/pingcap/kvproto/pkg/recoverdatapb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/common"
|
|
"github.com/pingcap/tidb/br/pkg/conn"
|
|
"github.com/pingcap/tidb/br/pkg/glue"
|
|
"github.com/pingcap/tidb/br/pkg/logutil"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/br/pkg/utils/storewatch"
|
|
"github.com/pingcap/tidb/pkg/ddl"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
tikvstore "github.com/tikv/client-go/v2/kv"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/txnkv/rangetask"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/backoff"
|
|
)
|
|
|
|
const gRPCBackOffMaxDelay = 3 * time.Second
|
|
|
|
type RecoveryStage int
|
|
|
|
const (
|
|
StageUnknown RecoveryStage = iota
|
|
StageCollectingMeta
|
|
StageMakingRecoveryPlan
|
|
StageResetPDAllocateID
|
|
StageRecovering
|
|
StageFlashback
|
|
)
|
|
|
|
func (s RecoveryStage) String() string {
|
|
switch s {
|
|
case StageCollectingMeta:
|
|
return "collecting meta"
|
|
case StageMakingRecoveryPlan:
|
|
return "making recovery plan"
|
|
case StageResetPDAllocateID:
|
|
return "resetting PD allocate ID"
|
|
case StageRecovering:
|
|
return "recovering"
|
|
case StageFlashback:
|
|
return "flashback"
|
|
default:
|
|
return "unknown"
|
|
}
|
|
}
|
|
|
|
type recoveryError struct {
|
|
error
|
|
atStage RecoveryStage
|
|
}
|
|
|
|
func atStage(err error) RecoveryStage {
|
|
var recoveryErr recoveryError
|
|
if stdErr.As(err, &recoveryErr) {
|
|
return recoveryErr.atStage
|
|
}
|
|
return StageUnknown
|
|
}
|
|
|
|
func isRetryErr(err error) bool {
|
|
stage := atStage(err)
|
|
switch stage {
|
|
case StageCollectingMeta, StageMakingRecoveryPlan, StageResetPDAllocateID, StageRecovering:
|
|
log.Info("Recovery data retrying.", zap.Error(err), zap.Stringer("stage", stage))
|
|
return true
|
|
case StageFlashback:
|
|
log.Info("Giving up retry for flashback stage.", zap.Error(err), zap.Stringer("stage", stage))
|
|
return false
|
|
default:
|
|
log.Warn("unknown stage of recovery for backoff.", zap.Int("val", int(stage)))
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RecoverData recover the tikv cluster
|
|
// 1. read all meta data from tikvs
|
|
// 2. make recovery plan and then recovery max allocate ID firstly
|
|
// 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log
|
|
// 4. ensure all region apply to last log
|
|
// 5. prepare the flashback
|
|
// 6. flashback to resolveTS
|
|
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
|
|
// Roughly handle the case that some TiKVs are rebooted during making plan.
|
|
// Generally, retry the whole procedure will be fine for most cases. But perhaps we can do finer-grained retry,
|
|
// say, we may reuse the recovery plan, and probably no need to rebase PD allocation ID once we have done it.
|
|
return utils.WithRetryV2(ctx, utils.NewRecoveryBackoffStrategy(isRetryErr), func(ctx context.Context) (int, error) {
|
|
return doRecoveryData(ctx, resolveTS, allStores, mgr, progress, restoreTS, concurrency)
|
|
})
|
|
}
|
|
|
|
func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error) {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
var recovery = NewRecovery(allStores, mgr, progress, concurrency)
|
|
if err := recovery.ReadRegionMeta(ctx); err != nil {
|
|
return 0, recoveryError{error: err, atStage: StageCollectingMeta}
|
|
}
|
|
|
|
totalRegions := recovery.GetTotalRegions()
|
|
|
|
if err := recovery.MakeRecoveryPlan(); err != nil {
|
|
return totalRegions, recoveryError{error: err, atStage: StageMakingRecoveryPlan}
|
|
}
|
|
|
|
log.Info("recover the alloc id to pd", zap.Uint64("max alloc id", recovery.MaxAllocID))
|
|
if err := recovery.mgr.RecoverBaseAllocID(ctx, recovery.MaxAllocID); err != nil {
|
|
return totalRegions, recoveryError{error: err, atStage: StageResetPDAllocateID}
|
|
}
|
|
|
|
// Once TiKV shuts down and reboot then, it may be left with no leader because of the recovery mode.
|
|
// This wathcher will retrigger `RecoveryRegions` for those stores.
|
|
recovery.SpawnTiKVShutDownWatchers(ctx)
|
|
if err := recovery.RecoverRegions(ctx); err != nil {
|
|
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
|
|
}
|
|
|
|
if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
|
|
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
|
|
}
|
|
|
|
if err := recovery.FlashbackToVersion(ctx, resolveTS, restoreTS); err != nil {
|
|
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
|
|
}
|
|
|
|
return totalRegions, nil
|
|
}
|
|
|
|
type StoreMeta struct {
|
|
StoreId uint64
|
|
RegionMetas []*recovpb.RegionMeta
|
|
}
|
|
|
|
func NewStoreMeta(storeId uint64) StoreMeta {
|
|
var meta = make([]*recovpb.RegionMeta, 0)
|
|
return StoreMeta{storeId, meta}
|
|
}
|
|
|
|
// for test
|
|
type Recovery struct {
|
|
allStores []*metapb.Store
|
|
StoreMetas []StoreMeta
|
|
RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
|
|
MaxAllocID uint64
|
|
mgr *conn.Mgr
|
|
progress glue.Progress
|
|
concurrency uint32
|
|
}
|
|
|
|
func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery {
|
|
totalStores := len(allStores)
|
|
var StoreMetas = make([]StoreMeta, totalStores)
|
|
var regionRecovers = make(map[uint64][]*recovpb.RecoverRegionRequest, totalStores)
|
|
return Recovery{
|
|
allStores: allStores,
|
|
StoreMetas: StoreMetas,
|
|
RecoveryPlan: regionRecovers,
|
|
MaxAllocID: 0,
|
|
mgr: mgr,
|
|
progress: progress,
|
|
concurrency: concurrency}
|
|
}
|
|
|
|
func (recovery *Recovery) newRecoveryClient(ctx context.Context, storeAddr string) (recovpb.RecoverDataClient, *grpc.ClientConn, error) {
|
|
// Connect to the Recovery service on the given TiKV node.
|
|
bfConf := backoff.DefaultConfig
|
|
bfConf.MaxDelay = gRPCBackOffMaxDelay
|
|
|
|
conn, err := utils.GRPCConn(ctx, storeAddr, recovery.mgr.GetTLSConfig(),
|
|
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
|
|
grpc.WithKeepaliveParams(recovery.mgr.GetKeepalive()),
|
|
)
|
|
if err != nil {
|
|
return nil, conn, errors.Trace(err)
|
|
}
|
|
|
|
client := recovpb.NewRecoverDataClient(conn)
|
|
|
|
return client, conn, nil
|
|
}
|
|
|
|
func getStoreAddress(allStores []*metapb.Store, storeId uint64) string {
|
|
var addr string
|
|
for _, store := range allStores {
|
|
if store.GetId() == storeId {
|
|
addr = store.GetAddress()
|
|
}
|
|
}
|
|
|
|
if len(addr) == 0 {
|
|
log.Error("there is no tikv has this Id")
|
|
}
|
|
return addr
|
|
}
|
|
|
|
// ReadRegionMeta read all region meta from tikvs
|
|
func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
|
|
eg, ectx := errgroup.WithContext(ctx)
|
|
totalStores := len(recovery.allStores)
|
|
workers := util.NewWorkerPool(uint(min(totalStores, common.MaxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow?
|
|
|
|
// TODO: optimize the ErrorGroup when TiKV is panic
|
|
metaChan := make(chan StoreMeta, 1024)
|
|
defer close(metaChan)
|
|
|
|
for i := range totalStores {
|
|
storeId := recovery.allStores[i].GetId()
|
|
storeAddr := recovery.allStores[i].GetAddress()
|
|
|
|
if err := ectx.Err(); err != nil {
|
|
break
|
|
}
|
|
|
|
workers.ApplyOnErrorGroup(eg, func() error {
|
|
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer conn.Close()
|
|
log.Info("read meta from tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
|
|
stream, err := recoveryClient.ReadRegionMeta(ectx, &recovpb.ReadRegionMetaRequest{StoreId: storeId})
|
|
if err != nil {
|
|
log.Error("read region meta failed", zap.Uint64("store id", storeId))
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
storeMeta := NewStoreMeta(storeId)
|
|
// for a TiKV, received the stream
|
|
for {
|
|
var meta *recovpb.RegionMeta
|
|
if meta, err = stream.Recv(); err == nil {
|
|
storeMeta.RegionMetas = append(storeMeta.RegionMetas, meta)
|
|
} else if err == io.EOF {
|
|
//read to end of stream or server close the connection.
|
|
break
|
|
} else {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
metaChan <- storeMeta
|
|
return nil
|
|
})
|
|
}
|
|
|
|
for i := range totalStores {
|
|
select {
|
|
case <-ectx.Done(): // err or cancel, eg.wait will catch the error
|
|
break
|
|
case storeMeta := <-metaChan:
|
|
recovery.StoreMetas[i] = storeMeta
|
|
log.Info("received region meta from", zap.Int("store", int(storeMeta.StoreId)))
|
|
}
|
|
|
|
recovery.progress.Inc()
|
|
}
|
|
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (recovery *Recovery) GetTotalRegions() int {
|
|
// Group region peer info by region id.
|
|
var regions = make(map[uint64]struct{}, 0)
|
|
for _, v := range recovery.StoreMetas {
|
|
for _, m := range v.RegionMetas {
|
|
if _, ok := regions[m.RegionId]; !ok {
|
|
regions[m.RegionId] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
return len(regions)
|
|
}
|
|
|
|
func (recovery *Recovery) RecoverRegionOfStore(ctx context.Context, storeID uint64, plan []*recovpb.RecoverRegionRequest) error {
|
|
storeAddr := getStoreAddress(recovery.allStores, storeID)
|
|
recoveryClient, conn, err := recovery.newRecoveryClient(ctx, storeAddr)
|
|
if err != nil {
|
|
log.Error("create tikv client failed", zap.Uint64("store id", storeID))
|
|
return errors.Trace(err)
|
|
}
|
|
defer conn.Close()
|
|
log.Info("send recover region to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeID))
|
|
stream, err := recoveryClient.RecoverRegion(ctx)
|
|
if err != nil {
|
|
log.Error("create recover region failed", zap.Uint64("store id", storeID))
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// for a TiKV, send the stream
|
|
for _, s := range plan {
|
|
if err = stream.Send(s); err != nil {
|
|
log.Error("send recover region failed", zap.Error(err))
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
reply, err := stream.CloseAndRecv()
|
|
if err != nil {
|
|
log.Error("close the stream failed")
|
|
return errors.Trace(err)
|
|
}
|
|
recovery.progress.Inc()
|
|
log.Info("recover region execution success", zap.Uint64("store id", reply.GetStoreId()))
|
|
return nil
|
|
}
|
|
|
|
// RecoverRegions send the recovery plan to recovery region (force leader etc)
|
|
// only tikvs have regions whose have to recover be sent
|
|
func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error) {
|
|
eg, ectx := errgroup.WithContext(ctx)
|
|
totalRecoveredStores := len(recovery.RecoveryPlan)
|
|
workers := util.NewWorkerPool(uint(min(totalRecoveredStores, common.MaxStoreConcurrency)), "Recover Regions")
|
|
|
|
for storeId, plan := range recovery.RecoveryPlan {
|
|
if err := ectx.Err(); err != nil {
|
|
break
|
|
}
|
|
workers.ApplyOnErrorGroup(eg, func() error {
|
|
return recovery.RecoverRegionOfStore(ectx, storeId, plan)
|
|
})
|
|
}
|
|
// Wait for all TiKV instances force leader and wait apply to last log.
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
|
|
rebootStores := map[uint64]struct{}{}
|
|
cb := storewatch.MakeCallback(storewatch.WithOnReboot(func(s *metapb.Store) {
|
|
log.Info("Store reboot detected, will regenerate leaders.", zap.Uint64("id", s.GetId()))
|
|
rebootStores[s.Id] = struct{}{}
|
|
}), storewatch.WithOnDisconnect(func(s *metapb.Store) {
|
|
log.Warn("A store disconnected.", zap.Uint64("id", s.GetId()), zap.String("addr", s.GetAddress()))
|
|
}), storewatch.WithOnNewStoreRegistered(func(s *metapb.Store) {
|
|
log.Info("Start to observing the state of store.", zap.Uint64("id", s.GetId()))
|
|
}))
|
|
watcher := storewatch.New(recovery.mgr.PDClient(), cb)
|
|
tick := time.NewTicker(30 * time.Second)
|
|
mainLoop := func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-tick.C:
|
|
err := watcher.Step(ctx)
|
|
if err != nil {
|
|
log.Warn("Failed to step watcher.", logutil.ShortError(err))
|
|
}
|
|
for id := range rebootStores {
|
|
plan, ok := recovery.RecoveryPlan[id]
|
|
if !ok {
|
|
log.Warn("Store reboot detected, but no recovery plan found.", zap.Uint64("id", id))
|
|
continue
|
|
}
|
|
err := recovery.RecoverRegionOfStore(ctx, id, plan)
|
|
if err != nil {
|
|
log.Warn("Store reboot detected, but failed to regenerate leader.", zap.Uint64("id", id), logutil.ShortError(err))
|
|
continue
|
|
}
|
|
log.Info("Succeed to reload the leader in store.", zap.Uint64("id", id))
|
|
delete(rebootStores, id)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
go mainLoop()
|
|
}
|
|
|
|
// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
|
|
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
|
|
retryErr := utils.WithRetry(
|
|
ctx,
|
|
func() error {
|
|
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
|
|
stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r)
|
|
if err != nil {
|
|
log.Warn("region may not ready to serve, retry it...", zap.Error(err))
|
|
}
|
|
return stats, err
|
|
}
|
|
|
|
runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
|
|
// Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
|
|
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
|
|
if err != nil {
|
|
log.Warn("region flashback prepare get error")
|
|
return errors.Trace(err)
|
|
}
|
|
log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions()))
|
|
return nil
|
|
}, utils.NewFlashBackBackoffStrategy())
|
|
|
|
recovery.progress.Inc()
|
|
return retryErr
|
|
}
|
|
|
|
// flashback the region data to version resolveTS
|
|
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error) {
|
|
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
|
|
stats, err := ddl.SendFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, commitTS-1, commitTS, r)
|
|
return stats, err
|
|
}
|
|
|
|
runner := rangetask.NewRangeTaskRunner("br-flashback-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler)
|
|
// Run flashback on the entire TiKV cluster. Empty keys means the range is unbounded.
|
|
err = runner.RunOnRange(ctx, []byte(""), []byte(""))
|
|
if err != nil {
|
|
log.Error("region flashback get error",
|
|
zap.Uint64("resolveTS", resolveTS),
|
|
zap.Uint64("commitTS", commitTS),
|
|
zap.Int("regions", runner.CompletedRegions()))
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
log.Info("region flashback complete",
|
|
zap.Uint64("resolveTS", resolveTS),
|
|
zap.Uint64("commitTS", commitTS),
|
|
zap.Int("regions", runner.CompletedRegions()))
|
|
|
|
recovery.progress.Inc()
|
|
return nil
|
|
}
|
|
|
|
type RecoverRegion struct {
|
|
*recovpb.RegionMeta
|
|
StoreId uint64
|
|
}
|
|
|
|
// generate the related the recovery plan to tikvs:
|
|
// 1. check overlap the region, make a recovery decision
|
|
// 2. build a leader list for all region during the tikv startup
|
|
// 3. get max allocate id
|
|
func (recovery *Recovery) MakeRecoveryPlan() error {
|
|
storeBalanceScore := make(map[uint64]int, len(recovery.allStores))
|
|
// Group region peer info by region id. find the max allocateId
|
|
// region [id] [peer[0-n]]
|
|
var regions = make(map[uint64][]*RecoverRegion, 0)
|
|
for _, v := range recovery.StoreMetas {
|
|
storeId := v.StoreId
|
|
maxId := storeId
|
|
for _, m := range v.RegionMetas {
|
|
if regions[m.RegionId] == nil {
|
|
regions[m.RegionId] = make([]*RecoverRegion, 0, len(recovery.allStores))
|
|
}
|
|
regions[m.RegionId] = append(regions[m.RegionId], &RecoverRegion{m, storeId})
|
|
maxId = max(maxId, max(m.RegionId, m.PeerId))
|
|
}
|
|
recovery.MaxAllocID = max(recovery.MaxAllocID, maxId)
|
|
}
|
|
|
|
regionInfos := SortRecoverRegions(regions)
|
|
|
|
validPeers, err := CheckConsistencyAndValidPeer(regionInfos)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// all plans per region key=StoreId, value=reqs stream
|
|
//regionsPlan := make(map[uint64][]*recovmetapb.RecoveryCmdRequest, 0)
|
|
// Generate recover commands.
|
|
for regionId, peers := range regions {
|
|
if _, ok := validPeers[regionId]; !ok {
|
|
// TODO: Generate a tombstone command.
|
|
// 1, peer is tombstone
|
|
// 2, split region in progressing, old one can be a tombstone
|
|
log.Warn("detected tombstone peer for region", zap.Uint64("region id", regionId))
|
|
for _, peer := range peers {
|
|
plan := &recovpb.RecoverRegionRequest{Tombstone: true, AsLeader: false}
|
|
recovery.RecoveryPlan[peer.StoreId] = append(recovery.RecoveryPlan[peer.StoreId], plan)
|
|
}
|
|
} else {
|
|
// Generate normal commands.
|
|
log.Debug("detected valid region", zap.Uint64("region id", regionId))
|
|
// calc the leader candidates
|
|
leaderCandidates, err := LeaderCandidates(peers)
|
|
if err != nil {
|
|
log.Warn("region without peer", zap.Uint64("region id", regionId))
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// select the leader base on tikv storeBalanceScore
|
|
leader := SelectRegionLeader(storeBalanceScore, leaderCandidates)
|
|
log.Debug("as leader peer", zap.Uint64("store id", leader.StoreId), zap.Uint64("region id", leader.RegionId))
|
|
plan := &recovpb.RecoverRegionRequest{RegionId: leader.RegionId, AsLeader: true}
|
|
recovery.RecoveryPlan[leader.StoreId] = append(recovery.RecoveryPlan[leader.StoreId], plan)
|
|
storeBalanceScore[leader.StoreId] += 1
|
|
}
|
|
}
|
|
return nil
|
|
}
|