log-backup: implement the checkpoint V3 (#36114)

close pingcap/tidb#35164
This commit is contained in:
山岚
2022-07-13 11:07:05 +08:00
committed by GitHub
parent d214d3eec5
commit fee2a12d69
32 changed files with 3043 additions and 284 deletions

View File

@ -16,6 +16,7 @@ package main
import (
"github.com/pingcap/errors"
advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/trace"
"github.com/pingcap/tidb/br/pkg/utils"
@ -49,6 +50,7 @@ func NewStreamCommand() *cobra.Command {
newStreamStatusCommand(),
newStreamTruncateCommand(),
newStreamCheckCommand(),
newStreamAdvancerCommand(),
)
command.SetHelpFunc(func(command *cobra.Command, strings []string) {
task.HiddenFlagsForStream(command.Root().PersistentFlags())
@ -157,6 +159,21 @@ func newStreamCheckCommand() *cobra.Command {
return command
}
func newStreamAdvancerCommand() *cobra.Command {
command := &cobra.Command{
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. (only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCtl)
},
Hidden: true,
}
task.DefineStreamCommonFlags(command.Flags())
advancercfg.DefineFlagsForCheckpointAdvancerConfig(command.Flags())
return command
}
func streamCommand(command *cobra.Command, cmdName string) error {
var cfg task.StreamConfig
var err error
@ -192,6 +209,13 @@ func streamCommand(command *cobra.Command, cmdName string) error {
if err = cfg.ParseStreamPauseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamCtl:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
if err = cfg.AdvancerCfg.GetFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
default:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)

View File

@ -9,16 +9,14 @@ import (
"fmt"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/docker/go-units"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
@ -35,9 +33,7 @@ import (
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
)
@ -49,83 +45,17 @@ const (
// DefaultMergeRegionKeyCount is the default region key count, 960000.
DefaultMergeRegionKeyCount uint64 = 960000
dialTimeout = 30 * time.Second
resetRetryTimes = 3
)
// Pool is a lazy pool of gRPC channels.
// When `Get` called, it lazily allocates new connection if connection not full.
// If it's full, then it will return allocated channels round-robin.
type Pool struct {
mu sync.Mutex
conns []*grpc.ClientConn
next int
cap int
newConn func(ctx context.Context) (*grpc.ClientConn, error)
}
func (p *Pool) takeConns() (conns []*grpc.ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()
p.conns, conns = nil, p.conns
p.next = 0
return conns
}
// Close closes the conn pool.
func (p *Pool) Close() {
for _, c := range p.takeConns() {
if err := c.Close(); err != nil {
log.Warn("failed to close clientConn", zap.String("target", c.Target()), zap.Error(err))
}
}
}
// Get tries to get an existing connection from the pool, or make a new one if the pool not full.
func (p *Pool) Get(ctx context.Context) (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.conns) < p.cap {
c, err := p.newConn(ctx)
if err != nil {
return nil, err
}
p.conns = append(p.conns, c)
return c, nil
}
conn := p.conns[p.next]
p.next = (p.next + 1) % p.cap
return conn, nil
}
// NewConnPool creates a new Pool by the specified conn factory function and capacity.
func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) *Pool {
return &Pool{
cap: capacity,
conns: make([]*grpc.ClientConn, 0, capacity),
newConn: newConn,
mu: sync.Mutex{},
}
}
// Mgr manages connections to a TiDB cluster.
type Mgr struct {
*pdutil.PdController
tlsConf *tls.Config
dom *domain.Domain
storage kv.Storage // Used to access SQL related interfaces.
tikvStore tikv.Storage // Used to access TiKV specific interfaces.
grpcClis struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
keepalive keepalive.ClientParameters
dom *domain.Domain
storage kv.Storage // Used to access SQL related interfaces.
tikvStore tikv.Storage // Used to access TiKV specific interfaces.
ownsStorage bool
*utils.StoreManager
}
// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
@ -298,122 +228,31 @@ func NewMgr(
storage: storage,
tikvStore: tikvStorage,
dom: dom,
tlsConf: tlsConf,
ownsStorage: g.OwnsStorage(),
grpcClis: struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}{clis: make(map[uint64]*grpc.ClientConn)},
keepalive: keepalive,
StoreManager: utils.NewStoreManager(controller.GetPDClient(), keepalive, tlsConf),
}
return mgr, nil
}
func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) {
log.Info("failpoint hint-get-backup-client injected, "+
"process will notify the shell.", zap.Uint64("store", storeID))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
store, err := mgr.GetPDClient().GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if mgr.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(mgr.keepalive),
)
cancel()
if err != nil {
return nil, berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to make connection to store %d", storeID)
}
return conn, nil
}
// GetBackupClient get or create a backup client.
func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
var cli backuppb.BackupClient
if err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) {
cli = backuppb.NewBackupClient(cc)
}); err != nil {
return nil, err
}
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()
if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
return backuppb.NewBackupClient(conn), nil
}
conn, err := mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
return backuppb.NewBackupClient(conn), nil
return cli, nil
}
// ResetBackupClient reset the connection for backup client.
func (mgr *Mgr) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
func (mgr *Mgr) GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) {
var cli logbackup.LogBackupClient
if err := mgr.WithConn(ctx, storeID, func(cc *grpc.ClientConn) {
cli = logbackup.NewLogBackupClient(cc)
}); err != nil {
return nil, err
}
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()
if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
log.Warn("failed to reset grpc connection, retry it",
zap.Int("retry time", retry), logutil.ShortError(err))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
mgr.grpcClis.clis[storeID] = conn
break
}
if err != nil {
return nil, errors.Trace(err)
}
return backuppb.NewBackupClient(conn), nil
return cli, nil
}
// GetStorage returns a kv storage.
@ -423,7 +262,7 @@ func (mgr *Mgr) GetStorage() kv.Storage {
// GetTLSConfig returns the tls config.
func (mgr *Mgr) GetTLSConfig() *tls.Config {
return mgr.tlsConf
return mgr.StoreManager.TLSConfig()
}
// GetLockResolver gets the LockResolver.
@ -436,17 +275,10 @@ func (mgr *Mgr) GetDomain() *domain.Domain {
return mgr.dom
}
// Close closes all client in Mgr.
func (mgr *Mgr) Close() {
mgr.grpcClis.mu.Lock()
for _, cli := range mgr.grpcClis.clis {
err := cli.Close()
if err != nil {
log.Error("fail to close Mgr", zap.Error(err))
}
if mgr.StoreManager != nil {
mgr.StoreManager.Close()
}
mgr.grpcClis.mu.Unlock()
// Gracefully shutdown domain so it does not affect other TiDB DDL.
// Must close domain before closing storage, otherwise it gets stuck forever.
if mgr.ownsStorage {

View File

@ -14,6 +14,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -269,3 +270,29 @@ func Redact(field zap.Field) zap.Field {
}
return field
}
// StringifyRanges wrappes the key range into a stringer.
type StringifyKeys []kv.KeyRange
func (kr StringifyKeys) String() string {
sb := new(strings.Builder)
sb.WriteString("{")
for i, rng := range kr {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("[")
sb.WriteString(redact.Key(rng.StartKey))
sb.WriteString(", ")
var endKey string
if len(rng.EndKey) == 0 {
endKey = "inf"
} else {
endKey = redact.Key(rng.EndKey)
}
sb.WriteString(redact.String(endKey))
sb.WriteString(")")
}
sb.WriteString("}")
return sb.String()
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
@ -2174,7 +2175,7 @@ func (rc *Client) SaveSchemas(
m.StartVersion = logStartTS
})
schemas := sr.TidyOldSchemas()
schemas := TidyOldSchemas(sr)
schemasConcurrency := uint(mathutil.Min(64, schemas.Len()))
err := schemas.BackupSchemas(ctx, metaWriter, nil, nil, rc.restoreTS, schemasConcurrency, 0, true, nil)
if err != nil {
@ -2191,3 +2192,31 @@ func (rc *Client) SaveSchemas(
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
}
// TidyOldSchemas produces schemas information.
func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas {
var schemaIsEmpty bool
schemas := backup.NewBackupSchemas()
for _, dr := range sr.DbMap {
if dr.OldDBInfo == nil {
continue
}
schemaIsEmpty = true
for _, tr := range dr.TableMap {
if tr.OldTableInfo == nil {
continue
}
schemas.AddSchema(dr.OldDBInfo, tr.OldTableInfo)
schemaIsEmpty = false
}
// backup this empty schema if it has nothing table.
if schemaIsEmpty {
schemas.AddSchema(dr.OldDBInfo, nil)
}
}
return schemas
}

View File

@ -234,6 +234,7 @@ func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
if r.Err == nil {
return StrategyGiveUp
}
// we should unwrap the error or we cannot get the write gRPC status.
if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
switch gRPCErr.Code() {

View File

@ -22,7 +22,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
@ -98,33 +97,6 @@ func NewSchemasReplace(
}
}
// TidyOldSchemas produces schemas information.
func (sr *SchemasReplace) TidyOldSchemas() *backup.Schemas {
var schemaIsEmpty bool
schemas := backup.NewBackupSchemas()
for _, dr := range sr.DbMap {
if dr.OldDBInfo == nil {
continue
}
schemaIsEmpty = true
for _, tr := range dr.TableMap {
if tr.OldTableInfo == nil {
continue
}
schemas.AddSchema(dr.OldDBInfo, tr.OldTableInfo)
schemaIsEmpty = false
}
// backup this empty schema if it has nothing table.
if schemaIsEmpty {
schemas.AddSchema(dr.OldDBInfo, nil)
}
}
return schemas
}
func (sr *SchemasReplace) rewriteKeyForDB(key []byte, cf string) ([]byte, bool, error) {
rawMetaKey, err := ParseTxnMetaKeyFrom(key)
if err != nil {

View File

@ -7,6 +7,7 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/stretchr/testify/require"
)
@ -15,7 +16,7 @@ func TestGetCheckpointOfTask(t *testing.T) {
Info: backuppb.StreamBackupTaskInfo{
StartTs: 8,
},
Checkpoints: []stream.Checkpoint{
Checkpoints: []streamhelper.Checkpoint{
{
ID: 1,
TS: 10,

View File

@ -4,6 +4,7 @@ package stream
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
@ -17,12 +18,13 @@ import (
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/storage"
. "github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -104,6 +106,9 @@ func (t TaskStatus) GetMinStoreCheckpoint() Checkpoint {
initialized = true
checkpoint = cp
}
if cp.Type() == CheckpointTypeGlobal {
return cp
}
}
return checkpoint
}
@ -131,7 +136,6 @@ func (p *printByTable) AddTask(task TaskStatus) {
info := fmt.Sprintf("%s; gap=%s", pTime, gapColor.Sprint(gap))
return info
}
table.Add("checkpoint[global]", formatTS(task.GetMinStoreCheckpoint().TS))
p.addCheckpoints(&task, table, formatTS)
for store, e := range task.LastErrors {
table.Add(fmt.Sprintf("error[store=%d]", store), e.ErrorCode)
@ -142,11 +146,21 @@ func (p *printByTable) AddTask(task TaskStatus) {
}
func (p *printByTable) addCheckpoints(task *TaskStatus, table *glue.Table, formatTS func(uint64) string) {
for _, cp := range task.Checkpoints {
switch cp.Type() {
case CheckpointTypeStore:
table.Add(fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS))
cp := task.GetMinStoreCheckpoint()
items := make([][2]string, 0, len(task.Checkpoints))
if cp.Type() != CheckpointTypeGlobal {
for _, cp := range task.Checkpoints {
switch cp.Type() {
case CheckpointTypeStore:
items = append(items, [2]string{fmt.Sprintf("checkpoint[store=%d]", cp.ID), formatTS(cp.TS)})
}
}
} else {
items = append(items, [2]string{"checkpoint[central-global]", formatTS(cp.TS)})
}
for _, item := range items {
table.Add(item[0], item[1])
}
}
@ -241,10 +255,15 @@ func (p *printByJSON) PrintTasks() {
var logCountSumRe = regexp.MustCompile(`tikv_stream_handle_kv_batch_sum ([0-9]+)`)
type PDInfoProvider interface {
GetPDClient() pd.Client
GetTLSConfig() *tls.Config
}
// MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface.
// TODO: this is a temporary solution(aha, like in a Hackthon),
// we MUST find a better way for providing this information.
func MaybeQPS(ctx context.Context, mgr *conn.Mgr) (float64, error) {
func MaybeQPS(ctx context.Context, mgr PDInfoProvider) (float64, error) {
c := mgr.GetPDClient()
prefix := "http://"
if mgr.GetTLSConfig() != nil {
@ -316,12 +335,12 @@ func MaybeQPS(ctx context.Context, mgr *conn.Mgr) (float64, error) {
// StatusController is the controller type (or context type) for the command `stream status`.
type StatusController struct {
meta *MetaDataClient
mgr *conn.Mgr
mgr PDInfoProvider
view TaskPrinter
}
// NewStatusContorller make a status controller via some resource accessors.
func NewStatusController(meta *MetaDataClient, mgr *conn.Mgr, view TaskPrinter) *StatusController {
func NewStatusController(meta *MetaDataClient, mgr PDInfoProvider, view TaskPrinter) *StatusController {
return &StatusController{
meta: meta,
mgr: mgr,

View File

@ -0,0 +1,514 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"bytes"
"context"
"math"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// CheckpointAdvancer is the central node for advancing the checkpoint of log backup.
// It's a part of "checkpoint v3".
// Generally, it scan the regions in the task range, collect checkpoints from tikvs.
// ┌──────┐
// ┌────►│ TiKV │
// │ └──────┘
// │
// │
// ┌──────────┐GetLastFlushTSOfRegion│ ┌──────┐
// │ Advancer ├──────────────────────┼────►│ TiKV │
// └────┬─────┘ │ └──────┘
// │ │
// │ │
// │ │ ┌──────┐
// │ └────►│ TiKV │
// │ └──────┘
// │
// │ UploadCheckpointV3 ┌──────────────────┐
// └─────────────────────►│ PD │
// └──────────────────┘
type CheckpointAdvancer struct {
env Env
// The concurrency accessed task:
// both by the task listener and ticking.
task *backuppb.StreamBackupTaskInfo
taskMu sync.Mutex
// the read-only config.
// once tick begin, this should not be changed for now.
cfg config.Config
// the cache of region checkpoints.
// so we can advance only ranges with huge gap.
cache CheckpointsCache
// the internal state of advancer.
state advancerState
// the cached last checkpoint.
// if no progress, this cache can help us don't to send useless requests.
lastCheckpoint uint64
}
// advancerState is the sealed type for the state of advancer.
// the advancer has two stage: full scan and update small tree.
type advancerState interface {
// Note:
// Go doesn't support sealed classes or ADTs currently.
// (it can only be used at generic constraints...)
// Leave it empty for now.
// ~*fullScan | ~*updateSmallTree
}
// fullScan is the initial state of advancer.
// in this stage, we would "fill" the cache:
// insert ranges that union of them become the full range of task.
type fullScan struct {
fullScanTick int
}
// updateSmallTree is the "incremental stage" of advancer.
// we have build a "filled" cache, and we can pop a subrange of it,
// try to advance the checkpoint of those ranges.
type updateSmallTree struct {
consistencyCheckTick int
}
// NewCheckpointAdvancer creates a checkpoint advancer with the env.
func NewCheckpointAdvancer(env Env) *CheckpointAdvancer {
return &CheckpointAdvancer{
env: env,
cfg: config.Default(),
cache: NewCheckpoints(),
state: &fullScan{},
}
}
// disableCache removes the cache.
// note this won't lock the checkpoint advancer at `fullScan` state forever,
// you may need to change the config `AdvancingByCache`.
func (c *CheckpointAdvancer) disableCache() {
c.cache = NoOPCheckpointCache{}
c.state = fullScan{}
}
// enable the cache.
// also check `AdvancingByCache` in the config.
func (c *CheckpointAdvancer) enableCache() {
c.cache = NewCheckpoints()
c.state = fullScan{}
}
// UpdateConfig updates the config for the advancer.
// Note this should be called before starting the loop, because there isn't locks,
// TODO: support updating config when advancer starts working.
// (Maybe by applying changes at begin of ticking, and add locks.)
func (c *CheckpointAdvancer) UpdateConfig(newConf config.Config) {
needRefreshCache := newConf.AdvancingByCache != c.cfg.AdvancingByCache
c.cfg = newConf
if needRefreshCache {
if c.cfg.AdvancingByCache {
c.enableCache()
} else {
c.disableCache()
}
}
}
// UpdateConfigWith updates the config by modifying the current config.
func (c *CheckpointAdvancer) UpdateConfigWith(f func(*config.Config)) {
cfg := c.cfg
f(&cfg)
c.UpdateConfig(cfg)
}
// Config returns the current config.
func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
}
// GetCheckpointInRange scans the regions in the range,
// collect them to the collector.
func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error {
log.Debug("scanning range", logutil.Key("start", start), logutil.Key("end", end))
iter := IterateRegion(c.env, start, end)
for !iter.Done() {
rs, err := iter.Next(ctx)
if err != nil {
return err
}
log.Debug("scan region", zap.Int("len", len(rs)))
for _, r := range rs {
err := collector.collectRegion(r)
if err != nil {
log.Warn("meet error during getting checkpoint", logutil.ShortError(err))
return err
}
}
}
return nil
}
func (c *CheckpointAdvancer) recordTimeCost(message string, fields ...zap.Field) func() {
now := time.Now()
label := strings.ReplaceAll(message, " ", "-")
return func() {
cost := time.Since(now)
fields = append(fields, zap.Stringer("take", cost))
metrics.AdvancerTickDuration.WithLabelValues(label).Observe(cost.Seconds())
log.Debug(message, fields...)
}
}
// tryAdvance tries to advance the checkpoint ts of a set of ranges which shares the same checkpoint.
func (c *CheckpointAdvancer) tryAdvance(ctx context.Context, rst RangesSharesTS) (err error) {
defer c.recordTimeCost("try advance", zap.Uint64("checkpoint", rst.TS), zap.Int("len", len(rst.Ranges)))()
defer func() {
if err != nil {
c.cache.InsertRanges(rst)
}
}()
defer utils.PanicToErr(&err)
ranges := CollapseRanges(len(rst.Ranges), func(i int) kv.KeyRange { return rst.Ranges[i] })
workers := utils.NewWorkerPool(4, "sub ranges")
eg, cx := errgroup.WithContext(ctx)
collector := NewClusterCollector(ctx, c.env)
collector.setOnSuccessHook(c.cache.InsertRange)
for _, r := range ranges {
r := r
workers.ApplyOnErrorGroup(eg, func() (e error) {
defer c.recordTimeCost("get regions in range", zap.Uint64("checkpoint", rst.TS))()
defer utils.PanicToErr(&e)
return c.GetCheckpointInRange(cx, r.StartKey, r.EndKey, collector)
})
}
err = eg.Wait()
if err != nil {
return err
}
result, err := collector.Finish(ctx)
if err != nil {
return err
}
fr := result.FailureSubRanges
if len(fr) != 0 {
log.Debug("failure regions collected", zap.Int("size", len(fr)))
c.cache.InsertRanges(RangesSharesTS{
TS: rst.TS,
Ranges: fr,
})
}
return nil
}
// CalculateGlobalCheckpointLight tries to advance the global checkpoint by the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpointLight(ctx context.Context) (uint64, error) {
log.Info("advancer with cache: current tree", zap.Stringer("ct", c.cache))
rsts := c.cache.PopRangesWithGapGT(config.DefaultTryAdvanceThreshold)
if len(rsts) == 0 {
return 0, nil
}
workers := utils.NewWorkerPool(uint(config.DefaultMaxConcurrencyAdvance), "regions")
eg, cx := errgroup.WithContext(ctx)
for _, rst := range rsts {
rst := rst
workers.ApplyOnErrorGroup(eg, func() (err error) {
return c.tryAdvance(cx, *rst)
})
}
err := eg.Wait()
if err != nil {
return 0, err
}
log.Info("advancer with cache: new tree", zap.Stringer("cache", c.cache))
ts := c.cache.CheckpointTS()
return ts, nil
}
// CalculateGlobalCheckpoint calculates the global checkpoint, which won't use the cache.
func (c *CheckpointAdvancer) CalculateGlobalCheckpoint(ctx context.Context) (uint64, error) {
var (
cp = uint64(math.MaxInt64)
// TODO: Use The task range here.
thisRun []kv.KeyRange = []kv.KeyRange{{}}
nextRun []kv.KeyRange
)
defer c.recordTimeCost("record all")
cx, cancel := context.WithTimeout(ctx, c.cfg.MaxBackoffTime)
defer cancel()
for {
coll := NewClusterCollector(ctx, c.env)
coll.setOnSuccessHook(c.cache.InsertRange)
for _, u := range thisRun {
err := c.GetCheckpointInRange(cx, u.StartKey, u.EndKey, coll)
if err != nil {
return 0, err
}
}
result, err := coll.Finish(ctx)
if err != nil {
return 0, err
}
log.Debug("full: a run finished", zap.Any("checkpoint", result))
nextRun = append(nextRun, result.FailureSubRanges...)
if cp > result.Checkpoint {
cp = result.Checkpoint
}
if len(nextRun) == 0 {
return cp, nil
}
thisRun = nextRun
nextRun = nil
log.Debug("backoffing with subranges", zap.Int("subranges", len(thisRun)))
time.Sleep(c.cfg.BackoffTime)
}
}
// CollapseRanges collapse ranges overlapping or adjacent.
// Example:
// CollapseRanges({[1, 4], [2, 8], [3, 9]}) == {[1, 9]}
// CollapseRanges({[1, 3], [4, 7], [2, 3]}) == {[1, 3], [4, 7]}
func CollapseRanges(length int, getRange func(int) kv.KeyRange) []kv.KeyRange {
frs := make([]kv.KeyRange, 0, length)
for i := 0; i < length; i++ {
frs = append(frs, getRange(i))
}
sort.Slice(frs, func(i, j int) bool {
return bytes.Compare(frs[i].StartKey, frs[j].StartKey) < 0
})
result := make([]kv.KeyRange, 0, len(frs))
i := 0
for i < len(frs) {
item := frs[i]
for {
i++
if i >= len(frs) || (len(item.EndKey) != 0 && bytes.Compare(frs[i].StartKey, item.EndKey) > 0) {
break
}
if len(item.EndKey) != 0 && bytes.Compare(item.EndKey, frs[i].EndKey) < 0 || len(frs[i].EndKey) == 0 {
item.EndKey = frs[i].EndKey
}
}
result = append(result, item)
}
return result
}
func (c *CheckpointAdvancer) consumeAllTask(ctx context.Context, ch <-chan TaskEvent) error {
for {
select {
case e, ok := <-ch:
if !ok {
return nil
}
log.Info("meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
return err
}
return nil
}
default:
return nil
}
}
}
// beginListenTaskChange bootstraps the initial task set,
// and returns a channel respecting the change of tasks.
func (c *CheckpointAdvancer) beginListenTaskChange(ctx context.Context) (<-chan TaskEvent, error) {
ch := make(chan TaskEvent, 1024)
if err := c.env.Begin(ctx, ch); err != nil {
return nil, err
}
err := c.consumeAllTask(ctx, ch)
if err != nil {
return nil, err
}
return ch, nil
}
// StartTaskListener starts the task listener for the advancer.
// When no task detected, advancer would do nothing, please call this before begin the tick loop.
func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
cx, cancel := context.WithCancel(ctx)
var ch <-chan TaskEvent
for {
if cx.Err() != nil {
// make linter happy.
cancel()
return
}
var err error
ch, err = c.beginListenTaskChange(cx)
if err == nil {
break
}
log.Warn("failed to begin listening, retrying...", logutil.ShortError(err))
time.Sleep(c.cfg.BackoffTime)
}
go func() {
defer cancel()
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
return
}
log.Info("meet task event", zap.Stringer("event", &e))
if err := c.onTaskEvent(e); err != nil {
if errors.Cause(e.Err) != context.Canceled {
log.Error("listen task meet error, would reopen.", logutil.ShortError(err))
time.AfterFunc(c.cfg.BackoffTime, func() { c.StartTaskListener(ctx) })
}
return
}
}
}
}()
}
func (c *CheckpointAdvancer) onTaskEvent(e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
switch e.Type {
case EventAdd:
c.task = e.Info
case EventDel:
c.task = nil
c.state = &fullScan{}
c.cache.Clear()
case EventErr:
return e.Err
}
return nil
}
// advanceCheckpointBy advances the checkpoint by a checkpoint getter function.
func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpoint func(context.Context) (uint64, error)) error {
start := time.Now()
cp, err := getCheckpoint(ctx)
if err != nil {
return err
}
if cp < c.lastCheckpoint {
log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint), zap.Uint64("new", cp))
}
if cp <= c.lastCheckpoint {
return nil
}
log.Info("uploading checkpoint for task",
zap.Stringer("checkpoint", oracle.GetTimeFromTS(cp)),
zap.Uint64("checkpoint", cp),
zap.String("task", c.task.Name),
zap.Stringer("take", time.Since(start)))
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, cp); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
c.lastCheckpoint = cp
metrics.LastCheckpoint.WithLabelValues(c.task.GetName()).Set(float64(c.lastCheckpoint))
return nil
}
// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer func() {
e := recover()
if e != nil {
log.Error("panic during handing tick", zap.Stack("stack"), logutil.ShortError(err))
err = errors.Annotatef(berrors.ErrUnknown, "panic during handling tick: %s", e)
}
}()
err = c.tick(ctx)
return
}
func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error {
if s.consistencyCheckTick > 0 {
s.consistencyCheckTick--
return nil
}
defer c.recordTimeCost("consistency check")()
err := c.cache.ConsistencyCheck()
if err != nil {
log.Error("consistency check failed! log backup may lose data! rolling back to full scan for saving.", logutil.ShortError(err))
c.state = &fullScan{}
return err
} else {
log.Debug("consistency check passed.")
}
s.consistencyCheckTick = config.DefaultConsistencyCheckTick
return nil
}
func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
switch s := c.state.(type) {
case *fullScan:
if s.fullScanTick > 0 {
s.fullScanTick--
break
}
if c.task == nil {
log.Debug("No tasks yet, skipping advancing.")
return nil
}
defer func() {
s.fullScanTick = c.cfg.FullScanTick
}()
err := c.advanceCheckpointBy(ctx, c.CalculateGlobalCheckpoint)
if err != nil {
return err
}
if c.cfg.AdvancingByCache {
c.state = &updateSmallTree{}
}
case *updateSmallTree:
if err := c.onConsistencyCheckTick(s); err != nil {
return err
}
err := c.advanceCheckpointBy(ctx, c.CalculateGlobalCheckpointLight)
if err != nil {
return err
}
default:
log.Error("Unknown state type, skipping tick", zap.Stringer("type", reflect.TypeOf(c.state)))
}
return nil
}

View File

@ -0,0 +1,81 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"context"
"time"
"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
ownerPrompt = "log-backup"
ownerPath = "/tidb/br-stream/owner"
)
// AdvancerDaemon is a "high-availability" version of advancer.
// It involved the manager for electing a owner and doing things.
// You can embed it into your code by simply call:
//
// ad := NewAdvancerDaemon(adv, mgr)
// loop, err := ad.Begin(ctx)
// if err != nil {
// return err
// }
// loop()
type AdvancerDaemon struct {
adv *CheckpointAdvancer
manager owner.Manager
}
func NewAdvancerDaemon(adv *CheckpointAdvancer, manager owner.Manager) *AdvancerDaemon {
return &AdvancerDaemon{
adv: adv,
manager: manager,
}
}
func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
}
// Begin starts the daemon.
// It would do some bootstrap task, and return a closure that would begin the main loop.
func (ad *AdvancerDaemon) Begin(ctx context.Context) (func(), error) {
log.Info("begin advancer daemon", zap.String("id", ad.manager.ID()))
if err := ad.manager.CampaignOwner(); err != nil {
return nil, err
}
ad.adv.StartTaskListener(ctx)
tick := time.NewTicker(ad.adv.cfg.TickDuration)
loop := func() {
log.Info("begin advancer daemon loop", zap.String("id", ad.manager.ID()))
for {
select {
case <-ctx.Done():
log.Info("advancer loop exits", zap.String("id", ad.manager.ID()))
return
case <-tick.C:
log.Debug("deamon tick start", zap.Bool("is-owner", ad.manager.IsOwner()))
if ad.manager.IsOwner() {
metrics.AdvancerOwner.Set(1.0)
if err := ad.adv.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
} else {
metrics.AdvancerOwner.Set(0.0)
}
}
}
}
return loop, nil
}

View File

@ -0,0 +1,107 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"context"
"time"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// Env is the interface required by the advancer.
type Env interface {
// The region scanner provides the region information.
RegionScanner
// LogBackupService connects to the TiKV, so we can collect the region checkpoints.
LogBackupService
// StreamMeta connects to the metadata service (normally PD).
StreamMeta
}
// PDRegionScanner is a simple wrapper over PD
// to adapt the requirement of `RegionScan`.
type PDRegionScanner struct {
pd.Client
}
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c PDRegionScanner) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]RegionWithLeader, error) {
rs, err := c.Client.ScanRegions(ctx, key, endKey, limit)
if err != nil {
return nil, err
}
rls := make([]RegionWithLeader, 0, len(rs))
for _, r := range rs {
rls = append(rls, RegionWithLeader{
Region: r.Meta,
Leader: r.Leader,
})
}
return rls, nil
}
// clusterEnv is the environment for running in the real cluster.
type clusterEnv struct {
clis *utils.StoreManager
*TaskEventClient
PDRegionScanner
}
// GetLogBackupClient gets the log backup client.
func (t clusterEnv) GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) {
var cli logbackup.LogBackupClient
err := t.clis.WithConn(ctx, storeID, func(cc *grpc.ClientConn) {
cli = logbackup.NewLogBackupClient(cc)
})
if err != nil {
return nil, err
}
return cli, nil
}
// CliEnv creates the Env for CLI usage.
func CliEnv(cli *utils.StoreManager, etcdCli *clientv3.Client) Env {
return clusterEnv{
clis: cli,
TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{cli.PDClient()},
}
}
// TiDBEnv creates the Env by TiDB config.
func TiDBEnv(pdCli pd.Client, etcdCli *clientv3.Client, conf *config.Config) (Env, error) {
tconf, err := conf.GetTiKVConfig().Security.ToTLSConfig()
if err != nil {
return nil, err
}
return clusterEnv{
clis: utils.NewStoreManager(pdCli, keepalive.ClientParameters{
Time: time.Duration(conf.TiKVClient.GrpcKeepAliveTime) * time.Second,
Timeout: time.Duration(conf.TiKVClient.GrpcKeepAliveTimeout) * time.Second,
}, tconf),
TaskEventClient: &TaskEventClient{MetaDataClient: *NewMetaDataClient(etcdCli)},
PDRegionScanner: PDRegionScanner{Client: pdCli},
}, nil
}
type LogBackupService interface {
// GetLogBackupClient gets the log backup client.
GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error)
}
// StreamMeta connects to the metadata service (normally PD).
// It provides the global checkpoint information.
type StreamMeta interface {
// Begin begins listen the task event change.
Begin(ctx context.Context, ch chan<- TaskEvent) error
// UploadV3GlobalCheckpointForTask uploads the global checkpoint to the meta store.
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
}

View File

@ -0,0 +1,185 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper_test
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestBasic(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx := context.Background()
minCheckpoint := c.advanceCheckpoints()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
coll := streamhelper.NewClusterCollector(ctx, env)
err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll)
require.NoError(t, err)
r, err := coll.Finish(ctx)
require.NoError(t, err)
require.Len(t, r.FailureSubRanges, 0)
require.Equal(t, r.Checkpoint, minCheckpoint, "%d %d", r.Checkpoint, minCheckpoint)
}
func TestTick(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(cac *config.Config) {
cac.FullScanTick = 0
})
require.NoError(t, adv.OnTick(ctx))
for i := 0; i < 5; i++ {
cp := c.advanceCheckpoints()
require.NoError(t, adv.OnTick(ctx))
require.Equal(t, env.getCheckpoint(), cp)
}
}
func TestWithFailure(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
c.flushAll()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.UpdateConfigWith(func(cac *config.Config) {
cac.FullScanTick = 0
})
require.NoError(t, adv.OnTick(ctx))
cp := c.advanceCheckpoints()
for _, v := range c.stores {
v.flush()
break
}
require.NoError(t, adv.OnTick(ctx))
require.Less(t, env.getCheckpoint(), cp, "%d %d", env.getCheckpoint(), cp)
for _, v := range c.stores {
v.flush()
}
require.NoError(t, adv.OnTick(ctx))
require.Equal(t, env.getCheckpoint(), cp)
}
func shouldFinishInTime(t *testing.T, d time.Duration, name string, f func()) {
ch := make(chan struct{})
go func() {
f()
close(ch)
}()
select {
case <-time.After(d):
t.Fatalf("%s should finish in %s, but not", name, d)
case <-ch:
}
}
func TestCollectorFailure(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
c.onGetClient = func(u uint64) error {
return status.Error(codes.DataLoss,
"Exiled requests from the client, please slow down and listen a story: "+
"the server has been dropped, we are longing for new nodes, however the goddess(k8s) never allocates new resource. "+
"May you take the sword named `vim`, refactoring the definition of the nature, in the yaml file hidden at somewhere of the cluster, "+
"to save all of us and gain the response you desiring?")
}
ctx := context.Background()
splitKeys := make([]string, 0, 10000)
for i := 0; i < 10000; i++ {
splitKeys = append(splitKeys, fmt.Sprintf("%04d", i))
}
c.splitAndScatter(splitKeys...)
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
coll := streamhelper.NewClusterCollector(ctx, env)
shouldFinishInTime(t, 30*time.Second, "scan with always fail", func() {
// At this time, the sending may or may not fail because the sending and batching is doing asynchronously.
_ = adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll)
// ...but this must fail, not getting stuck.
_, err := coll.Finish(ctx)
require.Error(t, err)
})
}
func oneStoreFailure() func(uint64) error {
victim := uint64(0)
mu := new(sync.Mutex)
return func(u uint64) error {
mu.Lock()
defer mu.Unlock()
if victim == 0 {
victim = u
}
if victim == u {
return status.Error(codes.NotFound,
"The place once lit by the warm lamplight has been swallowed up by the debris now.")
}
return nil
}
}
func TestOneStoreFailure(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
c := createFakeCluster(t, 4, true)
ctx := context.Background()
splitKeys := make([]string, 0, 1000)
for i := 0; i < 1000; i++ {
splitKeys = append(splitKeys, fmt.Sprintf("%04d", i))
}
c.splitAndScatter(splitKeys...)
c.flushAll()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
require.NoError(t, adv.OnTick(ctx))
c.onGetClient = oneStoreFailure()
for i := 0; i < 100; i++ {
c.advanceCheckpoints()
c.flushAll()
require.ErrorContains(t, adv.OnTick(ctx), "the warm lamplight")
}
c.onGetClient = nil
cp := c.advanceCheckpoints()
c.flushAll()
require.NoError(t, adv.OnTick(ctx))
require.Equal(t, cp, env.checkpoint)
}

View File

@ -0,0 +1,432 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper_test
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"testing"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/kv"
"google.golang.org/grpc"
)
type flushSimulator struct {
flushedEpoch uint64
enabled bool
}
func (c flushSimulator) makeError(requestedEpoch uint64) *errorpb.Error {
if !c.enabled {
return nil
}
if c.flushedEpoch == 0 {
e := errorpb.Error{
Message: "not flushed",
}
return &e
}
if c.flushedEpoch != requestedEpoch {
e := errorpb.Error{
Message: "flushed epoch not match",
}
return &e
}
return nil
}
func (c flushSimulator) fork() flushSimulator {
return flushSimulator{
enabled: c.enabled,
}
}
type region struct {
rng kv.KeyRange
leader uint64
epoch uint64
id uint64
checkpoint uint64
fsim flushSimulator
}
type fakeStore struct {
id uint64
regions map[uint64]*region
}
type fakeCluster struct {
mu sync.Mutex
idAlloced uint64
stores map[uint64]*fakeStore
regions []*region
testCtx *testing.T
onGetClient func(uint64) error
}
func overlaps(a, b kv.KeyRange) bool {
if len(b.EndKey) == 0 {
return len(a.EndKey) == 0 || bytes.Compare(a.EndKey, b.StartKey) > 0
}
if len(a.EndKey) == 0 {
return len(b.EndKey) == 0 || bytes.Compare(b.EndKey, a.StartKey) > 0
}
return bytes.Compare(a.StartKey, b.EndKey) < 0 && bytes.Compare(b.StartKey, a.EndKey) < 0
}
func (f *region) splitAt(newID uint64, k string) *region {
newRegion := &region{
rng: kv.KeyRange{StartKey: []byte(k), EndKey: f.rng.EndKey},
leader: f.leader,
epoch: f.epoch + 1,
id: newID,
checkpoint: f.checkpoint,
fsim: f.fsim.fork(),
}
f.rng.EndKey = []byte(k)
f.epoch += 1
f.fsim = f.fsim.fork()
return newRegion
}
func (f *region) flush() {
f.fsim.flushedEpoch = f.epoch
}
func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
resp := &logbackup.GetLastFlushTSOfRegionResponse{
Checkpoints: []*logbackup.RegionCheckpoint{},
}
for _, r := range in.Regions {
region, ok := f.regions[r.Id]
if !ok || region.leader != f.id {
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
Err: &errorpb.Error{
Message: "not found",
},
Region: &logbackup.RegionIdentity{
Id: region.id,
EpochVersion: region.epoch,
},
})
continue
}
if err := region.fsim.makeError(r.EpochVersion); err != nil {
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
Err: err,
Region: &logbackup.RegionIdentity{
Id: region.id,
EpochVersion: region.epoch,
},
})
continue
}
if region.epoch != r.EpochVersion {
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
Err: &errorpb.Error{
Message: "epoch not match",
},
Region: &logbackup.RegionIdentity{
Id: region.id,
EpochVersion: region.epoch,
},
})
continue
}
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
Checkpoint: region.checkpoint,
Region: &logbackup.RegionIdentity{
Id: region.id,
EpochVersion: region.epoch,
},
})
}
return resp, nil
}
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
f.mu.Lock()
defer f.mu.Unlock()
sort.Slice(f.regions, func(i, j int) bool {
return bytes.Compare(f.regions[i].rng.StartKey, f.regions[j].rng.StartKey) < 0
})
result := make([]streamhelper.RegionWithLeader, 0, limit)
for _, region := range f.regions {
if overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit {
regionInfo := streamhelper.RegionWithLeader{
Region: &metapb.Region{
Id: region.id,
StartKey: region.rng.StartKey,
EndKey: region.rng.EndKey,
RegionEpoch: &metapb.RegionEpoch{
Version: region.epoch,
},
},
Leader: &metapb.Peer{
StoreId: region.leader,
},
}
result = append(result, regionInfo)
} else if bytes.Compare(region.rng.StartKey, key) > 0 {
break
}
}
return result, nil
}
func (f *fakeCluster) GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) {
if f.onGetClient != nil {
err := f.onGetClient(storeID)
if err != nil {
return nil, err
}
}
cli, ok := f.stores[storeID]
if !ok {
f.testCtx.Fatalf("the store %d doesn't exist", storeID)
}
return cli, nil
}
func (f *fakeCluster) findRegionById(rid uint64) *region {
for _, r := range f.regions {
if r.id == rid {
return r
}
}
return nil
}
func (f *fakeCluster) findRegionByKey(key []byte) *region {
for _, r := range f.regions {
if bytes.Compare(key, r.rng.StartKey) >= 0 && (len(r.rng.EndKey) == 0 || bytes.Compare(key, r.rng.EndKey) < 0) {
return r
}
}
panic(fmt.Sprintf("inconsistent key space; key = %X", key))
}
func (f *fakeCluster) transferRegionTo(rid uint64, newPeers []uint64) {
r := f.findRegionById(rid)
storeLoop:
for _, store := range f.stores {
for _, pid := range newPeers {
if pid == store.id {
store.regions[rid] = r
continue storeLoop
}
}
delete(store.regions, rid)
}
}
func (f *fakeCluster) splitAt(key string) {
k := []byte(key)
r := f.findRegionByKey(k)
newRegion := r.splitAt(f.idAlloc(), key)
for _, store := range f.stores {
_, ok := store.regions[r.id]
if ok {
store.regions[newRegion.id] = newRegion
}
}
f.regions = append(f.regions, newRegion)
}
func (f *fakeCluster) idAlloc() uint64 {
f.idAlloced++
return f.idAlloced
}
func (f *fakeCluster) chooseStores(n int) []uint64 {
s := make([]uint64, 0, len(f.stores))
for id := range f.stores {
s = append(s, id)
}
rand.Shuffle(len(s), func(i, j int) {
s[i], s[j] = s[j], s[i]
})
return s[:n]
}
func (f *fakeCluster) findPeers(rid uint64) (result []uint64) {
for _, store := range f.stores {
if _, ok := store.regions[rid]; ok {
result = append(result, store.id)
}
}
return
}
func (f *fakeCluster) shuffleLeader(rid uint64) {
r := f.findRegionById(rid)
peers := f.findPeers(rid)
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
newLeader := peers[0]
r.leader = newLeader
}
func (f *fakeCluster) splitAndScatter(keys ...string) {
f.mu.Lock()
defer f.mu.Unlock()
for _, key := range keys {
f.splitAt(key)
}
for _, r := range f.regions {
f.transferRegionTo(r.id, f.chooseStores(3))
f.shuffleLeader(r.id)
}
}
// a stub once in the future we want to make different stores hold different region instances.
func (f *fakeCluster) updateRegion(rid uint64, mut func(*region)) {
r := f.findRegionById(rid)
mut(r)
}
func (f *fakeCluster) advanceCheckpoints() uint64 {
minCheckpoint := uint64(math.MaxUint64)
for _, r := range f.regions {
f.updateRegion(r.id, func(r *region) {
r.checkpoint += rand.Uint64() % 256
if r.checkpoint < minCheckpoint {
minCheckpoint = r.checkpoint
}
r.fsim.flushedEpoch = 0
})
}
return minCheckpoint
}
func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
c := &fakeCluster{
stores: map[uint64]*fakeStore{},
regions: []*region{},
testCtx: t,
}
stores := make([]*fakeStore, 0, n)
for i := 0; i < n; i++ {
s := new(fakeStore)
s.id = c.idAlloc()
s.regions = map[uint64]*region{}
stores = append(stores, s)
}
initialRegion := &region{
rng: kv.KeyRange{},
leader: stores[0].id,
epoch: 0,
id: c.idAlloc(),
checkpoint: 0,
fsim: flushSimulator{
enabled: simEnabled,
},
}
for i := 0; i < 3; i++ {
if i < len(stores) {
stores[i].regions[initialRegion.id] = initialRegion
}
}
for _, s := range stores {
c.stores[s.id] = s
}
c.regions = append(c.regions, initialRegion)
return c
}
func (r *region) String() string {
return fmt.Sprintf("%d(%d):[%s,%s);%dL%d", r.id, r.epoch, hex.EncodeToString(r.rng.StartKey), hex.EncodeToString(r.rng.EndKey), r.checkpoint, r.leader)
}
func (s *fakeStore) String() string {
buf := new(strings.Builder)
fmt.Fprintf(buf, "%d: ", s.id)
for _, r := range s.regions {
fmt.Fprintf(buf, "%s ", r)
}
return buf.String()
}
func (f *fakeCluster) flushAll() {
for _, r := range f.regions {
r.flush()
}
}
func (s *fakeStore) flush() {
for _, r := range s.regions {
if r.leader == s.id {
r.flush()
}
}
}
func (f *fakeCluster) String() string {
buf := new(strings.Builder)
fmt.Fprint(buf, ">>> fake cluster <<<\nregions: ")
for _, region := range f.regions {
fmt.Fprint(buf, region, " ")
}
fmt.Fprintln(buf)
for _, store := range f.stores {
fmt.Fprintln(buf, store)
}
return buf.String()
}
type testEnv struct {
*fakeCluster
checkpoint uint64
testCtx *testing.T
mu sync.Mutex
}
func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error {
tsk := streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
},
}
ch <- tsk
return nil
}
func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, checkpoint uint64) error {
t.mu.Lock()
defer t.mu.Unlock()
if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
}
t.checkpoint = checkpoint
return nil
}
func (t *testEnv) getCheckpoint() uint64 {
t.mu.Lock()
defer t.mu.Unlock()
return t.checkpoint
}

View File

@ -1,5 +1,5 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
package stream
package streamhelper
import (
"bytes"
@ -28,6 +28,8 @@ type Checkpoint struct {
ID uint64 `json:"id,omitempty"`
Version uint64 `json:"epoch_version,omitempty"`
TS uint64 `json:"ts"`
IsGlobal bool `json:"-"`
}
type CheckpointType int
@ -36,12 +38,15 @@ const (
CheckpointTypeStore CheckpointType = iota
CheckpointTypeRegion
CheckpointTypeTask
CheckpointTypeGlobal
CheckpointTypeInvalid
)
// Type returns the type(provider) of the checkpoint.
func (cp Checkpoint) Type() CheckpointType {
switch {
case cp.IsGlobal:
return CheckpointTypeGlobal
case cp.ID == 0 && cp.Version == 0:
return CheckpointTypeTask
case cp.ID != 0 && cp.Version == 0:
@ -72,7 +77,7 @@ func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error) {
segs := bytes.Split(key, []byte("/"))
var checkpoint Checkpoint
switch string(segs[0]) {
case "store":
case checkpointTypeStore:
if len(segs) != 2 {
return checkpoint, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the store checkpoint seg mismatch; segs = %v", segs)
@ -82,7 +87,9 @@ func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error) {
return checkpoint, err
}
checkpoint.ID = id
case "region":
case checkpointTypeGlobal:
checkpoint.IsGlobal = true
case checkpointTypeRegion:
if len(segs) != 3 {
return checkpoint, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the region checkpoint seg mismatch; segs = %v", segs)
@ -187,6 +194,17 @@ func (c *MetaDataClient) CleanLastErrorOfTask(ctx context.Context, taskName stri
return nil
}
func (c *MetaDataClient) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
_, err := c.KV.Put(ctx, key, value)
if err != nil {
return err
}
return nil
}
// GetTask get the basic task handle from the metadata storage.
func (c *MetaDataClient) GetTask(ctx context.Context, taskName string) (*Task, error) {
resp, err := c.Get(ctx, TaskOf(taskName))
@ -235,25 +253,35 @@ func (c *MetaDataClient) GetTaskWithPauseStatus(ctx context.Context, taskName st
return &Task{cli: c, Info: taskInfo}, paused, nil
}
// GetAllTasks get all of tasks from metadata storage.
func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error) {
scanner := scanEtcdPrefix(c.Client, PrefixOfTask())
kvs, err := scanner.AllPages(ctx, 1)
func (c *MetaDataClient) TaskByInfo(t backuppb.StreamBackupTaskInfo) *Task {
return &Task{cli: c, Info: t}
}
func (c *MetaDataClient) GetAllTasksWithRevision(ctx context.Context) ([]Task, int64, error) {
resp, err := c.KV.Get(ctx, PrefixOfTask(), clientv3.WithPrefix())
if err != nil {
return nil, errors.Trace(err)
} else if len(kvs) == 0 {
return nil, nil
return nil, 0, errors.Trace(err)
}
kvs := resp.Kvs
if len(kvs) == 0 {
return nil, resp.Header.GetRevision(), nil
}
tasks := make([]Task, len(kvs))
for idx, kv := range kvs {
err = proto.Unmarshal(kv.Value, &tasks[idx].Info)
if err != nil {
return nil, errors.Trace(err)
return nil, 0, errors.Trace(err)
}
tasks[idx].cli = c
}
return tasks, nil
return tasks, resp.Header.GetRevision(), nil
}
// GetAllTasks get all of tasks from metadata storage.
func (c *MetaDataClient) GetAllTasks(ctx context.Context) ([]Task, error) {
tasks, _, err := c.GetAllTasksWithRevision(ctx)
return tasks, err
}
// GetTaskCount get the count of tasks from metadata storage.
@ -375,6 +403,14 @@ func (t *Task) Step(ctx context.Context, store uint64, ts uint64) error {
return nil
}
func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error {
_, err := t.cli.KV.Put(ctx, GlobalCheckpointOf(t.Info.Name), string(encodeUint64(ts)))
if err != nil {
return err
}
return nil
}
func (t *Task) LastError(ctx context.Context) (map[uint64]backuppb.StreamBackupError, error) {
storeToError := map[uint64]backuppb.StreamBackupError{}
prefix := LastErrorPrefixOf(t.Info.Name)

View File

@ -0,0 +1,315 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"go.uber.org/zap"
)
const (
defaultBatchSize = 1024
)
type onSuccessHook = func(uint64, kv.KeyRange)
// storeCollector collects the region checkpoints from some store.
// it receives requests from the input channel, batching the requests, and send them to the store.
// because the server supports batching, the range of request regions can be discrete.
// note this is a temporary struct, its lifetime is shorter that the tick of advancer.
type storeCollector struct {
storeID uint64
batchSize int
service LogBackupService
input chan RegionWithLeader
// the oneshot error reporter.
err *atomic.Value
// whether the recv and send loop has exited.
doneMessenger chan struct{}
onSuccess onSuccessHook
// concurrency safety:
// those fields should only be write on the goroutine running `recvLoop`.
// Once it exits, we can read those fields.
currentRequest logbackup.GetLastFlushTSOfRegionRequest
checkpoint uint64
inconsistent []kv.KeyRange
regionMap map[uint64]kv.KeyRange
}
func newStoreCollector(storeID uint64, srv LogBackupService) *storeCollector {
return &storeCollector{
storeID: storeID,
batchSize: defaultBatchSize,
service: srv,
input: make(chan RegionWithLeader, defaultBatchSize),
err: new(atomic.Value),
doneMessenger: make(chan struct{}),
regionMap: make(map[uint64]kv.KeyRange),
}
}
func (c *storeCollector) reportErr(err error) {
if oldErr := c.Err(); oldErr != nil {
log.Warn("reporting error twice, ignoring", logutil.AShortError("old", err), logutil.AShortError("new", oldErr))
return
}
c.err.Store(err)
}
func (c *storeCollector) Err() error {
err, ok := c.err.Load().(error)
if !ok {
return nil
}
return err
}
func (c *storeCollector) setOnSuccessHook(hook onSuccessHook) {
c.onSuccess = hook
}
func (c *storeCollector) begin(ctx context.Context) {
err := c.recvLoop(ctx)
if err != nil {
log.Warn("collector loop meet error", logutil.ShortError(err))
c.reportErr(err)
}
close(c.doneMessenger)
}
func (c *storeCollector) recvLoop(ctx context.Context) (err error) {
defer utils.PanicToErr(&err)
for {
select {
case <-ctx.Done():
return ctx.Err()
case r, ok := <-c.input:
if !ok {
return c.sendPendingRequests(ctx)
}
if r.Leader.StoreId != c.storeID {
log.Warn("trying to request to store which isn't the leader of region.",
zap.Uint64("region", r.Region.Id),
zap.Uint64("target-store", c.storeID),
zap.Uint64("leader", r.Leader.StoreId),
)
}
c.appendRegionMap(r)
c.currentRequest.Regions = append(c.currentRequest.Regions, &logbackup.RegionIdentity{
Id: r.Region.GetId(),
EpochVersion: r.Region.GetRegionEpoch().GetVersion(),
})
if len(c.currentRequest.Regions) >= c.batchSize {
err := c.sendPendingRequests(ctx)
if err != nil {
return err
}
}
}
}
}
func (c *storeCollector) appendRegionMap(r RegionWithLeader) {
c.regionMap[r.Region.GetId()] = kv.KeyRange{StartKey: r.Region.StartKey, EndKey: r.Region.EndKey}
}
type StoreCheckpoints struct {
HasCheckpoint bool
Checkpoint uint64
FailureSubRanges []kv.KeyRange
}
func (s *StoreCheckpoints) merge(other StoreCheckpoints) {
if other.HasCheckpoint && (other.Checkpoint < s.Checkpoint || !s.HasCheckpoint) {
s.Checkpoint = other.Checkpoint
s.HasCheckpoint = true
}
s.FailureSubRanges = append(s.FailureSubRanges, other.FailureSubRanges...)
}
func (s *StoreCheckpoints) String() string {
sb := new(strings.Builder)
sb.WriteString("StoreCheckpoints:")
if s.HasCheckpoint {
sb.WriteString(strconv.Itoa(int(s.Checkpoint)))
} else {
sb.WriteString("none")
}
fmt.Fprintf(sb, ":(remaining %d ranges)", len(s.FailureSubRanges))
return sb.String()
}
func (c *storeCollector) spawn(ctx context.Context) func(context.Context) (StoreCheckpoints, error) {
go c.begin(ctx)
return func(cx context.Context) (StoreCheckpoints, error) {
close(c.input)
select {
case <-cx.Done():
return StoreCheckpoints{}, cx.Err()
case <-c.doneMessenger:
}
if err := c.Err(); err != nil {
return StoreCheckpoints{}, err
}
sc := StoreCheckpoints{
HasCheckpoint: c.checkpoint != 0,
Checkpoint: c.checkpoint,
FailureSubRanges: c.inconsistent,
}
return sc, nil
}
}
func (c *storeCollector) sendPendingRequests(ctx context.Context) error {
log.Debug("sending batch", zap.Int("size", len(c.currentRequest.Regions)), zap.Uint64("store", c.storeID))
cli, err := c.service.GetLogBackupClient(ctx, c.storeID)
if err != nil {
return err
}
cps, err := cli.GetLastFlushTSOfRegion(ctx, &c.currentRequest)
if err != nil {
return err
}
metrics.GetCheckpointBatchSize.WithLabelValues("checkpoint").Observe(float64(len(c.currentRequest.GetRegions())))
c.currentRequest = logbackup.GetLastFlushTSOfRegionRequest{}
for _, checkpoint := range cps.Checkpoints {
if checkpoint.Err != nil {
log.Debug("failed to get region checkpoint", zap.Stringer("err", checkpoint.Err))
c.inconsistent = append(c.inconsistent, c.regionMap[checkpoint.Region.Id])
} else {
if c.onSuccess != nil {
c.onSuccess(checkpoint.Checkpoint, c.regionMap[checkpoint.Region.Id])
}
// assuming the checkpoint would never be zero, use it as the placeholder. (1970 is so far away...)
if checkpoint.Checkpoint < c.checkpoint || c.checkpoint == 0 {
c.checkpoint = checkpoint.Checkpoint
}
}
}
return nil
}
type runningStoreCollector struct {
collector *storeCollector
wait func(context.Context) (StoreCheckpoints, error)
}
// clusterCollector is the controller for collecting region checkpoints for the cluster.
// It creates multi store collectors.
// ┌──────────────────────┐ Requesting ┌────────────┐
// ┌─►│ StoreCollector[id=1] ├─────────────►│ TiKV[id=1] │
// │ └──────────────────────┘ └────────────┘
// │
// │Owns
// ┌──────────────────┐ │ ┌──────────────────────┐ Requesting ┌────────────┐
// │ ClusterCollector ├─────┼─►│ StoreCollector[id=4] ├─────────────►│ TiKV[id=4] │
// └──────────────────┘ │ └──────────────────────┘ └────────────┘
// │
// │
// │ ┌──────────────────────┐ Requesting ┌────────────┐
// └─►│ StoreCollector[id=5] ├─────────────►│ TiKV[id=5] │
// └──────────────────────┘ └────────────┘
type clusterCollector struct {
mu sync.Mutex
collectors map[uint64]runningStoreCollector
noLeaders []kv.KeyRange
onSuccess onSuccessHook
// The context for spawning sub collectors.
// Because the collectors are running lazily,
// keep the initial context for all subsequent goroutines,
// so we can make sure we can cancel all subtasks.
masterCtx context.Context
cancel context.CancelFunc
srv LogBackupService
}
// NewClusterCollector creates a new cluster collector.
// collectors are the structure transform region information to checkpoint information,
// by requesting the checkpoint of regions in the store.
func NewClusterCollector(ctx context.Context, srv LogBackupService) *clusterCollector {
cx, cancel := context.WithCancel(ctx)
return &clusterCollector{
collectors: map[uint64]runningStoreCollector{},
masterCtx: cx,
cancel: cancel,
srv: srv,
}
}
// setOnSuccessHook sets the hook when getting checkpoint of some region.
func (c *clusterCollector) setOnSuccessHook(hook onSuccessHook) {
c.onSuccess = hook
}
// collectRegion adds a region to the collector.
func (c *clusterCollector) collectRegion(r RegionWithLeader) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.masterCtx.Err() != nil {
return nil
}
if r.Leader.GetStoreId() == 0 {
log.Warn("there are regions without leader", zap.Uint64("region", r.Region.GetId()))
c.noLeaders = append(c.noLeaders, kv.KeyRange{StartKey: r.Region.StartKey, EndKey: r.Region.EndKey})
return nil
}
leader := r.Leader.StoreId
_, ok := c.collectors[leader]
if !ok {
coll := newStoreCollector(leader, c.srv)
if c.onSuccess != nil {
coll.setOnSuccessHook(c.onSuccess)
}
c.collectors[leader] = runningStoreCollector{
collector: coll,
wait: coll.spawn(c.masterCtx),
}
}
sc := c.collectors[leader].collector
select {
case sc.input <- r:
return nil
case <-sc.doneMessenger:
err := sc.Err()
if err != nil {
c.cancel()
}
return err
}
}
// Finish finishes collecting the region checkpoints, wait and returning the final result.
// Note this takes the ownership of this collector, you may create a new collector for next use.
func (c *clusterCollector) Finish(ctx context.Context) (StoreCheckpoints, error) {
defer c.cancel()
result := StoreCheckpoints{FailureSubRanges: c.noLeaders}
for id, coll := range c.collectors {
r, err := coll.wait(ctx)
if err != nil {
return StoreCheckpoints{}, errors.Annotatef(err, "store %d", id)
}
result.merge(r)
log.Debug("get checkpoint", zap.Stringer("checkpoint", &r), zap.Stringer("merged", &result))
}
return result, nil
}

View File

@ -0,0 +1,82 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package config
import (
"time"
"github.com/spf13/pflag"
)
const (
flagBackoffTime = "backoff-time"
flagMaxBackoffTime = "max-backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
flagAdvancingByCache = "advancing-by-cache"
DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 3 * time.Minute
)
var (
DefaultMaxConcurrencyAdvance = 8
)
type Config struct {
// The gap between two retries.
BackoffTime time.Duration `toml:"backoff-time" json:"backoff-time"`
// When after this time we cannot collect the safe resolved ts, give up.
MaxBackoffTime time.Duration `toml:"max-backoff-time" json:"max-backoff-time"`
// The gap between calculating checkpoints.
TickDuration time.Duration `toml:"tick-interval" json:"tick-interval"`
// The backoff time of full scan.
FullScanTick int `toml:"full-scan-tick" json:"full-scan-tick"`
// Whether enable the optimization -- use a cached heap to advancing the global checkpoint.
// This may reduce the gap of checkpoint but may cost more CPU.
AdvancingByCache bool `toml:"advancing-by-cache" json:"advancing-by-cache"`
}
func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
f.Duration(flagBackoffTime, 5*time.Second, "The gap between two retries.")
f.Duration(flagMaxBackoffTime, 20*time.Minute, "After how long we should advance the checkpoint.")
f.Duration(flagTickInterval, 12*time.Second, "From how log we trigger the tick (advancing the checkpoint).")
f.Bool(flagAdvancingByCache, true, "Whether enable the optimization -- use a cached heap to advancing the global checkpoint.")
f.Int(flagFullScanDiffTick, 4, "The backoff of full scan.")
}
func Default() Config {
return Config{
BackoffTime: 5 * time.Second,
MaxBackoffTime: 20 * time.Minute,
TickDuration: 12 * time.Second,
FullScanTick: 4,
AdvancingByCache: true,
}
}
func (conf *Config) GetFromFlags(f *pflag.FlagSet) error {
var err error
conf.BackoffTime, err = f.GetDuration(flagBackoffTime)
if err != nil {
return err
}
conf.MaxBackoffTime, err = f.GetDuration(flagMaxBackoffTime)
if err != nil {
return err
}
conf.TickDuration, err = f.GetDuration(flagTickInterval)
if err != nil {
return err
}
conf.FullScanTick, err = f.GetInt(flagFullScanDiffTick)
if err != nil {
return err
}
conf.AdvancingByCache, err = f.GetBool(flagAdvancingByCache)
if err != nil {
return err
}
return nil
}

View File

@ -1,7 +1,7 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
// This package tests the login in MetaClient with a embed etcd.
package stream_test
package streamhelper_test
import (
"context"
@ -15,7 +15,7 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/tablecodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/kv"
@ -63,11 +63,11 @@ func runEtcd(t *testing.T) (*embed.Etcd, *clientv3.Client) {
return etcd, cli
}
func simpleRanges(tableCount int) stream.Ranges {
ranges := stream.Ranges{}
func simpleRanges(tableCount int) streamhelper.Ranges {
ranges := streamhelper.Ranges{}
for i := 0; i < tableCount; i++ {
base := int64(i*2 + 1)
ranges = append(ranges, stream.Range{
ranges = append(ranges, streamhelper.Range{
StartKey: tablecodec.EncodeTablePrefix(base),
EndKey: tablecodec.EncodeTablePrefix(base + 1),
})
@ -75,9 +75,9 @@ func simpleRanges(tableCount int) stream.Ranges {
return ranges
}
func simpleTask(name string, tableCount int) stream.TaskInfo {
func simpleTask(name string, tableCount int) streamhelper.TaskInfo {
backend, _ := storage.ParseBackend("noop://", nil)
task, err := stream.NewTask(name).
task, err := streamhelper.NewTask(name).
FromTS(1).
UntilTS(1000).
WithRanges(simpleRanges(tableCount)...).
@ -110,7 +110,7 @@ func keyNotExists(t *testing.T, key []byte, etcd *embed.Etcd) {
require.Len(t, r.KVs, 0)
}
func rangeMatches(t *testing.T, ranges stream.Ranges, etcd *embed.Etcd) {
func rangeMatches(t *testing.T, ranges streamhelper.Ranges, etcd *embed.Etcd) {
r, err := etcd.Server.KV().Range(context.TODO(), ranges[0].StartKey, ranges[len(ranges)-1].EndKey, mvcc.RangeOptions{})
require.NoError(t, err)
if len(r.KVs) != len(ranges) {
@ -133,33 +133,34 @@ func rangeIsEmpty(t *testing.T, prefix []byte, etcd *embed.Etcd) {
func TestIntegration(t *testing.T) {
etcd, cli := runEtcd(t)
defer etcd.Server.Stop()
metaCli := stream.MetaDataClient{Client: cli}
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.TaskEventClient{MetaDataClient: metaCli}) })
}
func TestChecking(t *testing.T) {
noop, _ := storage.ParseBackend("noop://", nil)
// The name must not contains slash.
_, err := stream.NewTask("/root").
_, err := streamhelper.NewTask("/root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specify the external storage.
_, err = stream.NewTask("root").
_, err = streamhelper.NewTask("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Must specift the table filter and range?
_, err = stream.NewTask("root").
_, err = streamhelper.NewTask("root").
ToStorage(noop).
Check()
require.ErrorIs(t, errors.Cause(err), berrors.ErrPiTRInvalidTaskInfo)
// Happy path.
_, err = stream.NewTask("root").
_, err = streamhelper.NewTask("root").
WithRange([]byte("1"), []byte("2")).
WithTableFilter("*.*").
ToStorage(noop).
@ -167,43 +168,43 @@ func TestChecking(t *testing.T) {
require.NoError(t, err)
}
func testBasic(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) {
func testBasic(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "two_tables"
task := simpleTask(taskName, 2)
taskData, err := task.PBInfo.Marshal()
require.NoError(t, err)
require.NoError(t, metaCli.PutTask(ctx, task))
keyIs(t, []byte(stream.TaskOf(taskName)), taskData, etcd)
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
rangeMatches(t, []stream.Range{
{StartKey: []byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(1))), EndKey: tablecodec.EncodeTablePrefix(2)},
{StartKey: []byte(stream.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(3))), EndKey: tablecodec.EncodeTablePrefix(4)},
keyIs(t, []byte(streamhelper.TaskOf(taskName)), taskData, etcd)
keyNotExists(t, []byte(streamhelper.Pause(taskName)), etcd)
rangeMatches(t, []streamhelper.Range{
{StartKey: []byte(streamhelper.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(1))), EndKey: tablecodec.EncodeTablePrefix(2)},
{StartKey: []byte(streamhelper.RangeKeyOf(taskName, tablecodec.EncodeTablePrefix(3))), EndKey: tablecodec.EncodeTablePrefix(4)},
}, etcd)
remoteTask, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, remoteTask.Pause(ctx))
keyExists(t, []byte(stream.Pause(taskName)), etcd)
keyExists(t, []byte(streamhelper.Pause(taskName)), etcd)
require.NoError(t, metaCli.PauseTask(ctx, taskName))
keyExists(t, []byte(stream.Pause(taskName)), etcd)
keyExists(t, []byte(streamhelper.Pause(taskName)), etcd)
paused, err := remoteTask.IsPaused(ctx)
require.NoError(t, err)
require.True(t, paused)
require.NoError(t, metaCli.ResumeTask(ctx, taskName))
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
keyNotExists(t, []byte(streamhelper.Pause(taskName)), etcd)
require.NoError(t, metaCli.ResumeTask(ctx, taskName))
keyNotExists(t, []byte(stream.Pause(taskName)), etcd)
keyNotExists(t, []byte(streamhelper.Pause(taskName)), etcd)
paused, err = remoteTask.IsPaused(ctx)
require.NoError(t, err)
require.False(t, paused)
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
keyNotExists(t, []byte(stream.TaskOf(taskName)), etcd)
rangeIsEmpty(t, []byte(stream.RangesOf(taskName)), etcd)
keyNotExists(t, []byte(streamhelper.TaskOf(taskName)), etcd)
rangeIsEmpty(t, []byte(streamhelper.RangesOf(taskName)), etcd)
}
func testForwardProgress(t *testing.T, metaCli stream.MetaDataClient, etcd *embed.Etcd) {
func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "many_tables"
taskInfo := simpleTask(taskName, 65)
@ -227,3 +228,34 @@ func testForwardProgress(t *testing.T, metaCli stream.MetaDataClient, etcd *embe
require.NoError(t, err)
require.Equal(t, store2Checkpoint, uint64(40))
}
func testStreamListening(t *testing.T, metaCli streamhelper.TaskEventClient) {
ctx, cancel := context.WithCancel(context.Background())
taskName := "simple"
taskInfo := simpleTask(taskName, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo))
ch := make(chan streamhelper.TaskEvent, 1024)
require.NoError(t, metaCli.Begin(ctx, ch))
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
taskName2 := "simple2"
taskInfo2 := simpleTask(taskName2, 4)
require.NoError(t, metaCli.PutTask(ctx, taskInfo2))
require.NoError(t, metaCli.DeleteTask(ctx, taskName2))
first := <-ch
require.Equal(t, first.Type, streamhelper.EventAdd)
require.Equal(t, first.Name, taskName)
second := <-ch
require.Equal(t, second.Type, streamhelper.EventDel)
require.Equal(t, second.Name, taskName)
third := <-ch
require.Equal(t, third.Type, streamhelper.EventAdd)
require.Equal(t, third.Name, taskName2)
forth := <-ch
require.Equal(t, forth.Type, streamhelper.EventDel)
require.Equal(t, forth.Name, taskName2)
cancel()
_, ok := <-ch
require.False(t, ok)
}

View File

@ -1,5 +1,5 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
package stream
package streamhelper
import (
"bytes"
@ -21,10 +21,13 @@ const (
streamKeyPrefix = "/tidb/br-stream"
taskInfoPath = "/info"
// nolint:deadcode,varcheck
taskCheckpointPath = "/checkpoint"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
taskLastErrorPath = "/last-error"
taskCheckpointPath = "/checkpoint"
taskRangesPath = "/ranges"
taskPausePath = "/pause"
taskLastErrorPath = "/last-error"
checkpointTypeGlobal = "central_global"
checkpointTypeRegion = "region"
checkpointTypeStore = "store"
)
var (
@ -78,6 +81,11 @@ func CheckPointsOf(task string) string {
return buf.String()
}
// GlobalCheckpointOf returns the path to the "global" checkpoint of some task.
func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}
// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {

View File

@ -1,5 +1,5 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.
package stream
package streamhelper
import (
"context"

View File

@ -0,0 +1,122 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"bytes"
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/br/pkg/utils"
)
const (
defaultPageSize = 2048
)
type RegionWithLeader struct {
Region *metapb.Region
Leader *metapb.Peer
}
type RegionScanner interface {
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error)
}
type RegionIter struct {
cli RegionScanner
startKey, endKey []byte
currentStartKey []byte
// When the endKey become "", we cannot check whether the scan is done by
// comparing currentStartKey and endKey (because "" has different meaning in start key and end key).
// So set this to `ture` when endKey == "" and the scan is done.
infScanFinished bool
// The max slice size returned by `Next`.
// This can be changed before calling `Next` each time,
// however no thread safety provided.
PageSize int
}
// IterateRegion creates an iterater over the region range.
func IterateRegion(cli RegionScanner, startKey, endKey []byte) *RegionIter {
return &RegionIter{
cli: cli,
startKey: startKey,
endKey: endKey,
currentStartKey: startKey,
PageSize: defaultPageSize,
}
}
func CheckRegionConsistency(startKey, endKey []byte, regions []RegionWithLeader) error {
// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s",
redact.Key(startKey), redact.Key(endKey))
}
if bytes.Compare(regions[0].Region.StartKey, startKey) > 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "first region's startKey > startKey, startKey: %s, regionStartKey: %s",
redact.Key(startKey), redact.Key(regions[0].Region.StartKey))
} else if len(regions[len(regions)-1].Region.EndKey) != 0 && bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "last region's endKey < endKey, endKey: %s, regionEndKey: %s",
redact.Key(endKey), redact.Key(regions[len(regions)-1].Region.EndKey))
}
cur := regions[0]
for _, r := range regions[1:] {
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "region endKey not equal to next region startKey, endKey: %s, startKey: %s",
redact.Key(cur.Region.EndKey), redact.Key(r.Region.StartKey))
}
cur = r
}
return nil
}
// Next get the next page of regions.
func (r *RegionIter) Next(ctx context.Context) ([]RegionWithLeader, error) {
var rs []RegionWithLeader
state := utils.InitialRetryState(30, 500*time.Millisecond, 500*time.Millisecond)
err := utils.WithRetry(ctx, func() error {
regions, err := r.cli.RegionScan(ctx, r.currentStartKey, r.endKey, r.PageSize)
if err != nil {
return err
}
if len(regions) > 0 {
endKey := regions[len(regions)-1].Region.GetEndKey()
if err := CheckRegionConsistency(r.currentStartKey, endKey, regions); err != nil {
return err
}
rs = regions
return nil
}
return CheckRegionConsistency(r.currentStartKey, r.endKey, regions)
}, &state)
if err != nil {
return nil, err
}
endKey := rs[len(rs)-1].Region.EndKey
// We have meet the last region.
if len(endKey) == 0 {
r.infScanFinished = true
}
r.currentStartKey = endKey
return rs, nil
}
// Done checks whether the iteration is done.
func (r *RegionIter) Done() bool {
if len(r.endKey) == 0 {
return r.infScanFinished
}
return bytes.Compare(r.currentStartKey, r.endKey) >= 0
}

View File

@ -0,0 +1,170 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"bytes"
"context"
"fmt"
"strings"
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
)
type EventType int
const (
EventAdd EventType = iota
EventDel
EventErr
)
func (t EventType) String() string {
switch t {
case EventAdd:
return "Add"
case EventDel:
return "Del"
case EventErr:
return "Err"
}
return "Unknown"
}
type TaskEvent struct {
Type EventType
Name string
Info *backuppb.StreamBackupTaskInfo
Err error
}
func (t *TaskEvent) String() string {
if t.Err != nil {
return fmt.Sprintf("%s(%s, err = %s)", t.Type, t.Name, t.Err)
}
return fmt.Sprintf("%s(%s)", t.Type, t.Name)
}
type TaskEventClient struct {
MetaDataClient
}
func errorEvent(err error) TaskEvent {
return TaskEvent{
Type: EventErr,
Err: err,
}
}
func toTaskEvent(event *clientv3.Event) (TaskEvent, error) {
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task path (%s)", string(event.Kv.Key))
}
te := TaskEvent{}
te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask())
if event.Type == clientv3.EventTypeDelete {
te.Type = EventDel
} else if event.Type == clientv3.EventTypePut {
te.Type = EventAdd
} else {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type)
}
te.Info = new(backuppb.StreamBackupTaskInfo)
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
return TaskEvent{}, err
}
return te, nil
}
func eventFromWatch(resp clientv3.WatchResponse) ([]TaskEvent, error) {
result := make([]TaskEvent, 0, len(resp.Events))
for _, event := range resp.Events {
te, err := toTaskEvent(event)
if err != nil {
te.Type = EventErr
te.Err = err
}
result = append(result, te)
}
return result, nil
}
func (t TaskEventClient) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := eventFromWatch(resp)
if err != nil {
ch <- errorEvent(err)
return false
}
for _, event := range events {
ch <- event
}
return true
}
go func() {
defer close(ch)
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
// drain the remain event from channel.
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
return
}
}
}
}
}()
}
func (t TaskEventClient) getFullTasksAsEvent(ctx context.Context) ([]TaskEvent, int64, error) {
tasks, rev, err := t.GetAllTasksWithRevision(ctx)
if err != nil {
return nil, 0, err
}
events := make([]TaskEvent, 0, len(tasks))
for _, task := range tasks {
te := TaskEvent{
Type: EventAdd,
Name: task.Info.Name,
Info: &task.Info,
}
events = append(events, te)
}
return events, rev, nil
}
func (t TaskEventClient) Begin(ctx context.Context, ch chan<- TaskEvent) error {
initialTasks, rev, err := t.getFullTasksAsEvent(ctx)
if err != nil {
return err
}
// Note: maybe `go` here so we won't block?
for _, task := range initialTasks {
ch <- task
}
t.startListen(ctx, rev+1, ch)
return nil
}

View File

@ -0,0 +1,216 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper
import (
"fmt"
"strings"
"sync"
"time"
"github.com/google/btree"
"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
)
// CheckpointsCache is the heap-like cache for checkpoints.
//
// "Checkpoint" is the "Resolved TS" of some range.
// A resolved ts is a "watermark" for the system, which:
// - implies there won't be any transactions (in some range) commit with `commit_ts` smaller than this TS.
// - is monotonic increasing.
// A "checkpoint" is a "safe" Resolved TS, which:
// - is a TS *less than* the real resolved ts of now.
// - is based on range (it only promises there won't be new committed txns in the range).
// - the checkpoint of union of ranges is the minimal checkpoint of all ranges.
// As an example:
// +----------------------------------+
// ^-----------^ (Checkpoint = 42)
// ^---------------^ (Checkpoint = 76)
// ^-----------------------^ (Checkpoint = min(42, 76) = 42)
//
// For calculating the global checkpoint, we can make a heap-like structure:
// Checkpoint Ranges
// 42 -> {[0, 8], [16, 100]}
// 1002 -> {[8, 16]}
// 1082 -> {[100, inf]}
// For now, the checkpoint of range [8, 16] and [100, inf] won't affect the global checkpoint
// directly, so we can try to advance only the ranges of {[0, 8], [16, 100]} (which's checkpoint is steal).
// Once them get advance, the global checkpoint would be advanced then,
// and we don't need to update all ranges (because some new ranges don't need to be advanced so quickly.)
type CheckpointsCache interface {
fmt.Stringer
// InsertRange inserts a range with specified TS to the cache.
InsertRange(ts uint64, rng kv.KeyRange)
// InsertRanges inserts a set of ranges that sharing checkpoint to the cache.
InsertRanges(rst RangesSharesTS)
// CheckpointTS returns the now global (union of all ranges) checkpoint of the cache.
CheckpointTS() uint64
// PopRangesWithGapGT pops the ranges which's checkpoint is
PopRangesWithGapGT(d time.Duration) []*RangesSharesTS
// Check whether the ranges in the cache is integrate.
ConsistencyCheck() error
// Clear the cache.
Clear()
}
// NoOPCheckpointCache is used when cache disabled.
type NoOPCheckpointCache struct{}
func (NoOPCheckpointCache) InsertRange(ts uint64, rng kv.KeyRange) {}
func (NoOPCheckpointCache) InsertRanges(rst RangesSharesTS) {}
func (NoOPCheckpointCache) Clear() {}
func (NoOPCheckpointCache) String() string {
return "NoOPCheckpointCache"
}
func (NoOPCheckpointCache) CheckpointTS() uint64 {
panic("invalid state: NoOPCheckpointCache should never be used in advancing!")
}
func (NoOPCheckpointCache) PopRangesWithGapGT(d time.Duration) []*RangesSharesTS {
panic("invalid state: NoOPCheckpointCache should never be used in advancing!")
}
func (NoOPCheckpointCache) ConsistencyCheck() error {
return errors.Annotatef(berrors.ErrUnsupportedOperation, "invalid state: NoOPCheckpointCache should never be used in advancing!")
}
// RangesSharesTS is a set of ranges shares the same timestamp.
type RangesSharesTS struct {
TS uint64
Ranges []kv.KeyRange
}
func (rst *RangesSharesTS) String() string {
// Make a more friendly string.
return fmt.Sprintf("@%sR%d", oracle.GetTimeFromTS(rst.TS).Format("0405"), len(rst.Ranges))
}
func (rst *RangesSharesTS) Less(other btree.Item) bool {
return rst.TS < other.(*RangesSharesTS).TS
}
// Checkpoints is a heap that collects all checkpoints of
// regions, it supports query the latest checkpoint fast.
// This structure is thread safe.
type Checkpoints struct {
tree *btree.BTree
mu sync.Mutex
}
func NewCheckpoints() *Checkpoints {
return &Checkpoints{
tree: btree.New(32),
}
}
// String formats the slowest 5 ranges sharing TS to string.
func (h *Checkpoints) String() string {
h.mu.Lock()
defer h.mu.Unlock()
b := new(strings.Builder)
count := 0
total := h.tree.Len()
h.tree.Ascend(func(i btree.Item) bool {
rst := i.(*RangesSharesTS)
b.WriteString(rst.String())
b.WriteString(";")
count++
return count < 5
})
if total-count > 0 {
fmt.Fprintf(b, "O%d", total-count)
}
return b.String()
}
// InsertRanges insert a RangesSharesTS directly to the tree.
func (h *Checkpoints) InsertRanges(r RangesSharesTS) {
h.mu.Lock()
defer h.mu.Unlock()
if items := h.tree.Get(&r); items != nil {
i := items.(*RangesSharesTS)
i.Ranges = append(i.Ranges, r.Ranges...)
} else {
h.tree.ReplaceOrInsert(&r)
}
}
// InsertRange inserts the region and its TS into the region tree.
func (h *Checkpoints) InsertRange(ts uint64, rng kv.KeyRange) {
h.mu.Lock()
defer h.mu.Unlock()
r := h.tree.Get(&RangesSharesTS{TS: ts})
if r == nil {
r = &RangesSharesTS{TS: ts}
h.tree.ReplaceOrInsert(r)
}
rr := r.(*RangesSharesTS)
rr.Ranges = append(rr.Ranges, rng)
}
// Clear removes all records in the checkpoint cache.
func (h *Checkpoints) Clear() {
h.mu.Lock()
defer h.mu.Unlock()
h.tree.Clear(false)
}
// PopRangesWithGapGT pops ranges with gap greater than the specified duration.
// NOTE: maybe make something like `DrainIterator` for better composing?
func (h *Checkpoints) PopRangesWithGapGT(d time.Duration) []*RangesSharesTS {
h.mu.Lock()
defer h.mu.Unlock()
result := []*RangesSharesTS{}
for {
item, ok := h.tree.Min().(*RangesSharesTS)
if !ok {
return result
}
if time.Since(oracle.GetTimeFromTS(item.TS)) >= d {
result = append(result, item)
h.tree.DeleteMin()
} else {
return result
}
}
}
// CheckpointTS returns the cached checkpoint TS by the current state of the cache.
func (h *Checkpoints) CheckpointTS() uint64 {
h.mu.Lock()
defer h.mu.Unlock()
item, ok := h.tree.Min().(*RangesSharesTS)
if !ok {
return 0
}
return item.TS
}
// ConsistencyCheck checks whether the tree contains the full range of key space.
// TODO: add argument to it and check a sub range.
func (h *Checkpoints) ConsistencyCheck() error {
h.mu.Lock()
ranges := make([]kv.KeyRange, 0, 1024)
h.tree.Ascend(func(i btree.Item) bool {
ranges = append(ranges, i.(*RangesSharesTS).Ranges...)
return true
})
h.mu.Unlock()
r := CollapseRanges(len(ranges), func(i int) kv.KeyRange { return ranges[i] })
if len(r) != 1 || len(r[0].StartKey) != 0 || len(r[0].EndKey) != 0 {
return errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the region tree cannot cover the key space, collapsed: %s", logutil.StringifyKeys(r))
}
return nil
}

View File

@ -0,0 +1,161 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package streamhelper_test
import (
"math"
"testing"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/kv"
"github.com/stretchr/testify/require"
)
func TestInsert(t *testing.T) {
cases := []func(func(ts uint64, a, b string)){
func(insert func(ts uint64, a, b string)) {
insert(1, "", "01")
insert(1, "01", "02")
insert(2, "02", "022")
insert(4, "022", "")
},
func(insert func(ts uint64, a, b string)) {
insert(1, "", "01")
insert(2, "", "01")
insert(2, "011", "02")
insert(1, "", "")
insert(65, "03", "04")
},
}
for _, c := range cases {
cps := streamhelper.NewCheckpoints()
expected := map[uint64]*streamhelper.RangesSharesTS{}
checkpoint := uint64(math.MaxUint64)
insert := func(ts uint64, a, b string) {
cps.InsertRange(ts, kv.KeyRange{
StartKey: []byte(a),
EndKey: []byte(b),
})
i, ok := expected[ts]
if !ok {
expected[ts] = &streamhelper.RangesSharesTS{TS: ts, Ranges: []kv.KeyRange{{StartKey: []byte(a), EndKey: []byte(b)}}}
} else {
i.Ranges = append(i.Ranges, kv.KeyRange{StartKey: []byte(a), EndKey: []byte(b)})
}
if ts < checkpoint {
checkpoint = ts
}
}
c(insert)
require.Equal(t, checkpoint, cps.CheckpointTS())
rngs := cps.PopRangesWithGapGT(0)
for _, rng := range rngs {
other := expected[rng.TS]
require.Equal(t, other, rng)
}
}
}
func TestMergeRanges(t *testing.T) {
r := func(a, b string) kv.KeyRange {
return kv.KeyRange{StartKey: []byte(a), EndKey: []byte(b)}
}
type Case struct {
expected []kv.KeyRange
parameter []kv.KeyRange
}
cases := []Case{
{
parameter: []kv.KeyRange{r("01", "01111"), r("0111", "0112")},
expected: []kv.KeyRange{r("01", "0112")},
},
{
parameter: []kv.KeyRange{r("01", "03"), r("02", "04")},
expected: []kv.KeyRange{r("01", "04")},
},
{
parameter: []kv.KeyRange{r("04", "08"), r("09", "10")},
expected: []kv.KeyRange{r("04", "08"), r("09", "10")},
},
{
parameter: []kv.KeyRange{r("01", "03"), r("02", "04"), r("05", "07"), r("08", "09")},
expected: []kv.KeyRange{r("01", "04"), r("05", "07"), r("08", "09")},
},
{
parameter: []kv.KeyRange{r("01", "02"), r("012", "")},
expected: []kv.KeyRange{r("01", "")},
},
{
parameter: []kv.KeyRange{r("", "01"), r("02", "03"), r("021", "")},
expected: []kv.KeyRange{r("", "01"), r("02", "")},
},
{
parameter: []kv.KeyRange{r("", "01"), r("001", "")},
expected: []kv.KeyRange{r("", "")},
},
{
parameter: []kv.KeyRange{r("", "01"), r("", ""), r("", "02")},
expected: []kv.KeyRange{r("", "")},
},
{
parameter: []kv.KeyRange{r("", "01"), r("01", ""), r("", "02"), r("", "03"), r("01", "02")},
expected: []kv.KeyRange{r("", "")},
},
}
for i, c := range cases {
result := streamhelper.CollapseRanges(len(c.parameter), func(i int) kv.KeyRange {
return c.parameter[i]
})
require.Equal(t, c.expected, result, "case = %d", i)
}
}
func TestInsertRanges(t *testing.T) {
r := func(a, b string) kv.KeyRange {
return kv.KeyRange{StartKey: []byte(a), EndKey: []byte(b)}
}
rs := func(ts uint64, ranges ...kv.KeyRange) streamhelper.RangesSharesTS {
return streamhelper.RangesSharesTS{TS: ts, Ranges: ranges}
}
type Case struct {
Expected []streamhelper.RangesSharesTS
Parameters []streamhelper.RangesSharesTS
}
cases := []Case{
{
Parameters: []streamhelper.RangesSharesTS{
rs(1, r("0", "1"), r("1", "2")),
rs(1, r("2", "3"), r("3", "4")),
},
Expected: []streamhelper.RangesSharesTS{
rs(1, r("0", "1"), r("1", "2"), r("2", "3"), r("3", "4")),
},
},
{
Parameters: []streamhelper.RangesSharesTS{
rs(1, r("0", "1")),
rs(2, r("2", "3")),
rs(1, r("4", "5"), r("6", "7")),
},
Expected: []streamhelper.RangesSharesTS{
rs(1, r("0", "1"), r("4", "5"), r("6", "7")),
rs(2, r("2", "3")),
},
},
}
for _, c := range cases {
theTree := streamhelper.NewCheckpoints()
for _, p := range c.Parameters {
theTree.InsertRanges(p)
}
ranges := theTree.PopRangesWithGapGT(0)
for i, rs := range ranges {
require.ElementsMatch(t, c.Expected[i].Ranges, rs.Ranges, "case = %#v", c)
}
}
}

View File

@ -39,6 +39,8 @@ import (
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
advancercfg "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
@ -70,6 +72,7 @@ var (
StreamStatus = "log status"
StreamTruncate = "log truncate"
StreamMetadata = "log metadata"
StreamCtl = "log ctl"
skipSummaryCommandList = map[string]struct{}{
StreamStatus: {},
@ -90,6 +93,7 @@ var StreamCommandMap = map[string]func(c context.Context, g glue.Glue, cmdName s
StreamStatus: RunStreamStatus,
StreamTruncate: RunStreamTruncate,
StreamMetadata: RunStreamMetadata,
StreamCtl: RunStreamAdvancer,
}
// StreamConfig specifies the configure about backup stream
@ -111,6 +115,9 @@ type StreamConfig struct {
// Spec for the command `status`.
JSONOutput bool `json:"json-output" toml:"json-output"`
// Spec for the command `advancer`.
AdvancerCfg advancercfg.Config `json:"advancer-config" toml:"advancer-config"`
}
func (cfg *StreamConfig) makeStorage(ctx context.Context) (storage.ExternalStorage, error) {
@ -521,7 +528,7 @@ func RunStreamStart(
return errors.Trace(err)
}
cli := stream.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// It supports single stream log task currently.
if count, err := cli.GetTaskCount(ctx); err != nil {
return errors.Trace(err)
@ -548,7 +555,7 @@ func RunStreamStart(
return errors.Annotate(berrors.ErrInvalidArgument, "nothing need to observe")
}
ti := stream.TaskInfo{
ti := streamhelper.TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Storage: streamMgr.bc.GetStorageBackend(),
StartTs: cfg.StartTS,
@ -623,7 +630,7 @@ func RunStreamStop(
}
defer streamMgr.close()
cli := stream.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, err := cli.GetTask(ctx, cfg.TaskName)
if err != nil {
@ -673,7 +680,7 @@ func RunStreamPause(
}
defer streamMgr.close()
cli := stream.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
if err != nil {
@ -731,7 +738,7 @@ func RunStreamResume(
}
defer streamMgr.close()
cli := stream.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
cli := streamhelper.NewMetaDataClient(streamMgr.mgr.GetDomain().GetEtcdClient())
// to add backoff
ti, isPaused, err := cli.GetTaskWithPauseStatus(ctx, cfg.TaskName)
if err != nil {
@ -776,6 +783,31 @@ func RunStreamResume(
return nil
}
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
ctx, cancel := context.WithCancel(c)
defer cancel()
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
cfg.CheckRequirements, false)
if err != nil {
return err
}
etcdCLI, err := dialEtcdWithCfg(ctx, cfg.Config)
if err != nil {
return err
}
env := streamhelper.CliEnv(mgr.StoreManager, etcdCLI)
advancer := streamhelper.NewCheckpointAdvancer(env)
advancer.UpdateConfig(cfg.AdvancerCfg)
daemon := streamhelper.NewAdvancerDaemon(advancer, streamhelper.OwnerManagerForLogBackup(ctx, etcdCLI))
loop, err := daemon.Begin(ctx)
if err != nil {
return err
}
loop()
return nil
}
func checkConfigForStatus(cfg *StreamConfig) error {
if len(cfg.PD) == 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
@ -793,7 +825,7 @@ func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) (
if err != nil {
return nil, err
}
cli := stream.NewMetaDataClient(etcdCLI)
cli := streamhelper.NewMetaDataClient(etcdCLI)
var printer stream.TaskPrinter
if !cfg.JSONOutput {
printer = stream.PrintTaskByTable(console)

View File

@ -0,0 +1,244 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package utils
import (
"context"
"crypto/tls"
"os"
"sync"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
const (
dialTimeout = 30 * time.Second
resetRetryTimes = 3
)
// Pool is a lazy pool of gRPC channels.
// When `Get` called, it lazily allocates new connection if connection not full.
// If it's full, then it will return allocated channels round-robin.
type Pool struct {
mu sync.Mutex
conns []*grpc.ClientConn
next int
cap int
newConn func(ctx context.Context) (*grpc.ClientConn, error)
}
func (p *Pool) takeConns() (conns []*grpc.ClientConn) {
p.mu.Lock()
defer p.mu.Unlock()
p.conns, conns = nil, p.conns
p.next = 0
return conns
}
// Close closes the conn pool.
func (p *Pool) Close() {
for _, c := range p.takeConns() {
if err := c.Close(); err != nil {
log.Warn("failed to close clientConn", zap.String("target", c.Target()), zap.Error(err))
}
}
}
// Get tries to get an existing connection from the pool, or make a new one if the pool not full.
func (p *Pool) Get(ctx context.Context) (*grpc.ClientConn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.conns) < p.cap {
c, err := p.newConn(ctx)
if err != nil {
return nil, err
}
p.conns = append(p.conns, c)
return c, nil
}
conn := p.conns[p.next]
p.next = (p.next + 1) % p.cap
return conn, nil
}
// NewConnPool creates a new Pool by the specified conn factory function and capacity.
func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) *Pool {
return &Pool{
cap: capacity,
conns: make([]*grpc.ClientConn, 0, capacity),
newConn: newConn,
mu: sync.Mutex{},
}
}
type StoreManager struct {
pdClient pd.Client
grpcClis struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}
keepalive keepalive.ClientParameters
tlsConf *tls.Config
}
// NewStoreManager create a new manager for gRPC connections to stores.
func NewStoreManager(pdCli pd.Client, kl keepalive.ClientParameters, tlsConf *tls.Config) *StoreManager {
return &StoreManager{
pdClient: pdCli,
grpcClis: struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}{clis: make(map[uint64]*grpc.ClientConn)},
keepalive: kl,
tlsConf: tlsConf,
}
}
func (mgr *StoreManager) PDClient() pd.Client {
return mgr.pdClient
}
func (mgr *StoreManager) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) {
log.Info("failpoint hint-get-backup-client injected, "+
"process will notify the shell.", zap.Uint64("store", storeID))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
store, err := mgr.pdClient.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
opt := grpc.WithInsecure()
if mgr.tlsConf != nil {
opt = grpc.WithTransportCredentials(credentials.NewTLS(mgr.tlsConf))
}
ctx, cancel := context.WithTimeout(ctx, dialTimeout)
bfConf := backoff.DefaultConfig
bfConf.MaxDelay = time.Second * 3
addr := store.GetPeerAddress()
if addr == "" {
addr = store.GetAddress()
}
conn, err := grpc.DialContext(
ctx,
addr,
opt,
grpc.WithBlock(),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(mgr.keepalive),
)
cancel()
if err != nil {
return nil, berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to make connection to store %d", storeID)
}
return conn, nil
}
func (mgr *StoreManager) WithConn(ctx context.Context, storeID uint64, f func(*grpc.ClientConn)) error {
if ctx.Err() != nil {
return errors.Trace(ctx.Err())
}
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()
if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
f(conn)
return nil
}
conn, err := mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
return errors.Trace(err)
}
// Cache the conn.
mgr.grpcClis.clis[storeID] = conn
f(conn)
return nil
}
// ResetBackupClient reset the connection for backup client.
func (mgr *StoreManager) ResetBackupClient(ctx context.Context, storeID uint64) (backuppb.BackupClient, error) {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}
mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()
if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
log.Info("Reset backup client", zap.Uint64("storeID", storeID))
err := conn.Close()
if err != nil {
log.Warn("close backup connection failed, ignore it", zap.Uint64("storeID", storeID))
}
delete(mgr.grpcClis.clis, storeID)
}
var (
conn *grpc.ClientConn
err error
)
for retry := 0; retry < resetRetryTimes; retry++ {
conn, err = mgr.getGrpcConnLocked(ctx, storeID)
if err != nil {
log.Warn("failed to reset grpc connection, retry it",
zap.Int("retry time", retry), logutil.ShortError(err))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
mgr.grpcClis.clis[storeID] = conn
break
}
if err != nil {
return nil, errors.Trace(err)
}
return backuppb.NewBackupClient(conn), nil
}
// Close closes all client in Mgr.
func (mgr *StoreManager) Close() {
if mgr == nil {
return
}
mgr.grpcClis.mu.Lock()
for _, cli := range mgr.grpcClis.clis {
err := cli.Close()
if err != nil {
log.Error("fail to close Mgr", zap.Error(err))
}
}
mgr.grpcClis.mu.Unlock()
}
func (mgr *StoreManager) TLSConfig() *tls.Config {
if mgr == nil {
return nil
}
return mgr.tlsConf
}

View File

@ -3,7 +3,10 @@
package utils
import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -107,3 +110,23 @@ func (pool *WorkerPool) RecycleWorker(worker *Worker) {
func (pool *WorkerPool) HasWorker() bool {
return pool.IdleCount() > 0
}
// PanicToErr recovers when the execution get panicked, and set the error provided by the arg.
// generally, this would be used with named return value and `defer`, like:
//
// func foo() (err error) {
// defer utils.PanicToErr(&err)
// return maybePanic()
// }
//
// Before using this, there are some hints for reducing resource leakage or bugs:
// - If any of clean work (by `defer`) relies on the error (say, when error happens, rollback some operations.), please
// place `defer this` AFTER that.
// - All resources allocated should be freed by the `defer` syntax, or when panicking, they may not be recycled.
func PanicToErr(err *error) {
item := recover()
if item != nil {
*err = errors.Annotatef(berrors.ErrUnknown, "panicked when executing, message: %v", item)
log.Warn("checkpoint advancer panicked, recovering", zap.StackSkip("stack", 1), logutil.ShortError(*err))
}
}

View File

@ -32,6 +32,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/parser/terror"
typejson "github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/logutil"
@ -256,8 +257,10 @@ type Config struct {
// BallastObjectSize set the initial size of the ballast object, the unit is byte.
BallastObjectSize int `toml:"ballast-object-size" json:"ballast-object-size"`
// EnableGlobalKill indicates whether to enable global kill.
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"`
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
// LogBackup controls the log backup related items.
LogBackup LogBackup `toml:"log-backup" json:"log-backup"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.
@ -416,6 +419,13 @@ func (b *AtomicBool) UnmarshalText(text []byte) error {
return nil
}
// LogBackup is the config for log backup service.
// For now, it includes the embed advancer.
type LogBackup struct {
Advancer logbackupconf.Config `toml:"advancer" json:"advancer"`
Enabled bool `toml:"enabled" json:"enabled"`
}
// Log is the log section of config.
type Log struct {
// Log level.
@ -942,6 +952,10 @@ var defaultConf = Config{
NewCollationsEnabledOnFirstBootstrap: true,
EnableGlobalKill: true,
TrxSummary: DefaultTrxSummary(),
LogBackup: LogBackup{
Advancer: logbackupconf.Default(),
Enabled: false,
},
}
var (

View File

@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
@ -92,6 +93,7 @@ type Domain struct {
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time
logBackupAdvancer *streamhelper.AdvancerDaemon
serverID uint64
serverIDSession *concurrency.Session
@ -889,10 +891,33 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
do.wg.Add(1)
go do.topologySyncerKeeper()
}
err = do.initLogBackup(ctx, pdClient)
if err != nil {
return err
}
return nil
}
func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if cfg.LogBackup.Enabled {
env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg)
if err != nil {
return err
}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfig(cfg.LogBackup.Advancer)
do.logBackupAdvancer = streamhelper.NewAdvancerDaemon(adv, streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient))
loop, err := do.logBackupAdvancer.Begin(ctx)
if err != nil {
return err
}
do.wg.Run(loop)
}
return nil
}
type sessionPool struct {
resources chan pools.Resource
factory pools.Factory

2
go.mod
View File

@ -46,7 +46,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a
github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33
github.com/pingcap/log v1.1.0
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e

4
go.sum
View File

@ -665,8 +665,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17Xtb
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a h1:nP2wmyw9JTRsk5rm+tZtfAso6c/1FvuaFNbXTaYz3FE=
github.com/pingcap/kvproto v0.0.0-20220705053936-aa9c2d20cd2a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33 h1:VKMmvYhtG28j1sCCBdq4s+V9UOYqNgQ6CQviQwOgTeg=
github.com/pingcap/kvproto v0.0.0-20220705090230-a5d4ffd2ba33/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=

51
metrics/log_backup.go Normal file
View File

@ -0,0 +1,51 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
// log backup metrics.
// see the `Help` field for details.
var (
LastCheckpoint = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "last_checkpoint",
Help: "The last global checkpoint of log backup.",
}, []string{"task"})
AdvancerOwner = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "advancer_owner",
Help: "If the node is the owner of advancers, set this to `1`, otherwise `0`.",
ConstLabels: map[string]string{},
})
AdvancerTickDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "advancer_tick_duration_sec",
Help: "The time cost of each step during advancer ticking.",
Buckets: prometheus.ExponentialBuckets(0.01, 3.0, 8),
}, []string{"step"})
GetCheckpointBatchSize = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "log_backup",
Name: "advancer_batch_size",
Help: "The batch size of scanning region or get region checkpoint.",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 12),
}, []string{"type"})
)

View File

@ -191,6 +191,10 @@ func RegisterMetrics() {
prometheus.MustRegister(StatsHealthyGauge)
prometheus.MustRegister(TxnStatusEnteringCounter)
prometheus.MustRegister(TxnDurationHistogram)
prometheus.MustRegister(LastCheckpoint)
prometheus.MustRegister(AdvancerOwner)
prometheus.MustRegister(AdvancerTickDuration)
prometheus.MustRegister(GetCheckpointBatchSize)
tikvmetrics.InitMetrics(TiDB, TiKVClient)
tikvmetrics.RegisterMetrics()