br: Enable checkpoint advancer to pause tasks lagged too large (#51441)

close pingcap/tidb#50803
This commit is contained in:
ris
2024-03-13 02:03:39 -07:00
committed by GitHub
parent 0d742d35f8
commit 7548df70b1
10 changed files with 263 additions and 24 deletions

View File

@ -68,7 +68,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 22,
shard_count = 25,
deps = [
":streamhelper",
"//br/pkg/errors",
@ -89,8 +89,10 @@ go_test(
"@com_github_pingcap_kvproto//pkg/logbackuppb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//txnkv/txnlock",

View File

@ -71,6 +71,7 @@ type CheckpointAdvancer struct {
lastCheckpoint *checkpoint
lastCheckpointMu sync.Mutex
inResolvingLock atomic.Bool
isPaused atomic.Bool
checkpoints *spans.ValueSortedFull
checkpointsMu sync.Mutex
@ -446,6 +447,14 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
log.Warn("failed to remove service GC safepoint", logutil.ShortError(err))
}
metrics.LastCheckpoint.DeleteLabelValues(e.Name)
case EventPause:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(false, true)
}
case EventResume:
if c.task.GetName() == e.Name {
c.isPaused.CompareAndSwap(true, false)
}
case EventErr:
return e.Err
}
@ -544,6 +553,25 @@ func (c *CheckpointAdvancer) subscribeTick(ctx context.Context) error {
return c.subscriber.PendingErrors()
}
func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, error) {
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
return true, nil
}
return false, nil
}
func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
c.checkpointsMu.Lock()
c.setCheckpoint(ctx, c.checkpoints.Min())
@ -551,6 +579,17 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
if err := c.env.UploadV3GlobalCheckpointForTask(ctx, c.task.Name, c.lastCheckpoint.TS); err != nil {
return errors.Annotate(err, "failed to upload global checkpoint")
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
if err != nil {
return errors.Annotate(err, "failed to pause task")
}
return errors.Annotate(errors.Errorf("check point lagged too large"), "check point lagged too large")
}
p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS())
if err != nil {
return errors.Annotatef(err,
@ -606,7 +645,7 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil {
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
return nil
}

View File

@ -29,6 +29,8 @@ const (
EventAdd EventType = iota
EventDel
EventErr
EventPause
EventResume
)
func (t EventType) String() string {
@ -39,6 +41,10 @@ func (t EventType) String() string {
return "Del"
case EventErr:
return "Err"
case EventPause:
return "Pause"
case EventResume:
return "Resume"
}
return "Unknown"
}
@ -70,29 +76,47 @@ func errorEvent(err error) TaskEvent {
}
func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) {
if !bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument,
"the path isn't a task path (%s)", string(event.Kv.Key))
te := TaskEvent{}
var prefix string
if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfTask())) {
prefix = PrefixOfTask()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else if bytes.HasPrefix(event.Kv.Key, []byte(PrefixOfPause())) {
prefix = PrefixOfPause()
te.Name = strings.TrimPrefix(string(event.Kv.Key), prefix)
} else {
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument, "the path isn't a task/pause path (%s)",
string(event.Kv.Key))
}
te := TaskEvent{}
te.Name = strings.TrimPrefix(string(event.Kv.Key), PrefixOfTask())
if event.Type == clientv3.EventTypeDelete {
te.Type = EventDel
} else if event.Type == clientv3.EventTypePut {
switch {
case event.Type == clientv3.EventTypePut && prefix == PrefixOfTask():
te.Type = EventAdd
} else {
return TaskEvent{}, errors.Annotatef(berrors.ErrInvalidArgument, "event type is wrong (%s)", event.Type)
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfTask():
te.Type = EventDel
case event.Type == clientv3.EventTypePut && prefix == PrefixOfPause():
te.Type = EventPause
case event.Type == clientv3.EventTypeDelete && prefix == PrefixOfPause():
te.Type = EventResume
default:
return TaskEvent{},
errors.Annotatef(berrors.ErrInvalidArgument,
"invalid event type or prefix: type=%s, prefix=%s", event.Type, prefix)
}
te.Info = new(backuppb.StreamBackupTaskInfo)
if err := proto.Unmarshal(event.Kv.Value, te.Info); err != nil {
return TaskEvent{}, err
}
var err error
te.Ranges, err = t.MetaDataClient.TaskByInfo(*te.Info).Ranges(ctx)
if err != nil {
return TaskEvent{}, err
}
return te, nil
}
@ -113,7 +137,10 @@ func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResp
}
func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) {
c := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev))
pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev))
// inner function def
handleResponse := func(resp clientv3.WatchResponse) bool {
events, err := t.eventFromWatch(ctx, resp)
if err != nil {
@ -127,22 +154,27 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
}
return true
}
// inner function def
collectRemaining := func() {
log.Info("Start collecting remaining events in the channel.", zap.String("category", "log backup advancer"),
zap.Int("remained", len(c)))
zap.Int("remained", len(taskCh)))
defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer"))
for {
select {
case resp, ok := <-c:
if !ok {
return
}
if !handleResponse(resp) {
return
}
default:
if taskCh == nil && pauseCh == nil {
return
}
select {
case resp, ok := <-taskCh:
if !ok || !handleResponse(resp) {
taskCh = nil
}
case resp, ok := <-pauseCh:
if !ok || !handleResponse(resp) {
pauseCh = nil
}
}
}
}
@ -150,7 +182,7 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
defer close(ch)
for {
select {
case resp, ok := <-c:
case resp, ok := <-taskCh:
failpoint.Inject("advancer_close_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
@ -162,6 +194,18 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
if !handleResponse(resp) {
return
}
case resp, ok := <-pauseCh:
failpoint.Inject("advancer_close_pause_channel", func() {
// We cannot really close the channel, just simulating it.
ok = false
})
if !ok {
ch <- errorEvent(io.EOF)
return
}
if !handleResponse(resp) {
return
}
case <-ctx.Done():
collectRemaining()
ch <- errorEvent(ctx.Err())

View File

@ -10,6 +10,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
@ -48,6 +49,11 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
}
// TODO: It should be able to synchoronize the current TS with the PD.
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
}
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (c PDRegionScanner) RegionScan(ctx context.Context, key, endKey []byte, limit int) ([]RegionWithLeader, error) {
@ -152,6 +158,7 @@ type StreamMeta interface {
UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error
// ClearV3GlobalCheckpointForTask clears the global checkpoint to the meta store.
ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error
PauseTask(ctx context.Context, taskName string) error
}
var _ tikv.RegionLockResolver = &AdvancerLockResolver{}

View File

@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/pkg/kv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
@ -463,3 +464,80 @@ func TestRemoveTaskAndFlush(t *testing.T) {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
func TestEnableCheckPointLimit(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(30 * time.Second)
c.advanceCheckpointBy(20 * time.Second)
require.NoError(t, adv.OnTick(ctx))
}
}
func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
}
func TestCheckPointResume(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := &testEnv{fakeCluster: c, testCtx: t}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//now the checkpoint issue is fixed and resumed
c.advanceCheckpointBy(1 * time.Minute)
env.ResumeTask(ctx)
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
}, 5*time.Second, 100*time.Millisecond)
//with time passed, the checkpoint will exceed the limit again
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
}

View File

@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
@ -102,6 +103,7 @@ type fakeCluster struct {
onGetClient func(uint64) error
onClearCache func(uint64) error
serviceGCSafePoint uint64
currentTS uint64
}
func (r *region) splitAt(newID uint64, k string) *region {
@ -273,6 +275,10 @@ func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, erro
return at, nil
}
func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) {
return f.currentTS, nil
}
// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
@ -490,6 +496,29 @@ func (f *fakeCluster) advanceCheckpoints() uint64 {
return minCheckpoint
}
func (f *fakeCluster) advanceCheckpointBy(duration time.Duration) uint64 {
minCheckpoint := uint64(math.MaxUint64)
for _, r := range f.regions {
f.updateRegion(r.id, func(r *region) {
newCheckpointTime := oracle.GetTimeFromTS(r.checkpoint.Load()).Add(duration)
newCheckpoint := oracle.GoTimeToTS(newCheckpointTime)
r.checkpoint.Store(newCheckpoint)
if newCheckpoint < minCheckpoint {
minCheckpoint = newCheckpoint
}
r.fsim.flushedEpoch.Store(0)
})
}
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
return minCheckpoint
}
func (f *fakeCluster) advanceClusterTimeBy(duration time.Duration) uint64 {
newTime := oracle.GoTimeToTS(oracle.GetTimeFromTS(f.currentTS).Add(duration))
f.currentTS = newTime
return newTime
}
func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
c := &fakeCluster{
stores: map[uint64]*fakeStore{},
@ -654,6 +683,22 @@ func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName s
return nil
}
func (t *testEnv) PauseTask(ctx context.Context, taskName string) error {
t.taskCh <- streamhelper.TaskEvent{
Type: streamhelper.EventPause,
Name: taskName,
}
return nil
}
func (t *testEnv) ResumeTask(ctx context.Context) error {
t.taskCh <- streamhelper.TaskEvent{
Type: streamhelper.EventResume,
Name: "whole",
}
return nil
}
func (t *testEnv) getCheckpoint() uint64 {
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -17,6 +17,7 @@ const (
DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 0
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
@ -34,6 +35,8 @@ type Config struct {
TickDuration time.Duration `toml:"tick-interval" json:"tick-interval"`
// The threshold for polling TiKV for checkpoint of some range.
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
// The maximum lag could be tolerated for the checkpoint lag.
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`
}
func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
@ -50,6 +53,7 @@ func Default() Config {
BackoffTime: DefaultBackOffTime,
TickDuration: DefaultTickInterval,
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
CheckPointLagLimit: DefaultCheckPointLagLimit,
}
}
@ -76,6 +80,11 @@ func (conf Config) GetDefaultStartPollThreshold() time.Duration {
return conf.TryAdvanceThreshold
}
// GetCheckPointLagLimit returns the maximum lag could be tolerated for the checkpoint lag.
func (conf Config) GetCheckPointLagLimit() time.Duration {
return conf.CheckPointLagLimit
}
// GetSubscriberErrorStartPollThreshold returns the threshold of begin polling the checkpoint
// when the subscriber meets error.
func (conf Config) GetSubscriberErrorStartPollThreshold() time.Duration {

View File

@ -94,6 +94,12 @@ func Pause(task string) string {
return path.Join(streamKeyPrefix, taskPausePath, task)
}
// PrefixOfPause returns the prefix for pausing the task.
// Normally it would be <prefix>/pause/
func PrefixOfPause() string {
return path.Join(streamKeyPrefix, taskPausePath) + "/"
}
// LastErrorPrefixOf make the prefix for searching last error by some task.
func LastErrorPrefixOf(task string) string {
return strings.TrimSuffix(path.Join(streamKeyPrefix, taskLastErrorPath, task), "/") + "/"

View File

@ -42,6 +42,8 @@ type TiKVClusterMeta interface {
// NOTE: once we support multi tasks, perhaps we need to allow the caller to provide a namespace.
// For now, all tasks (exactly one task in fact) use the same checkpoint.
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)
FetchCurrentTS(ctx context.Context) (uint64, error)
}
type Store struct {

View File

@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"testing"
"time"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/logutil"
@ -16,6 +17,7 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/pkg/kv"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@ -81,6 +83,11 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
}
// TODO: It should be able to synchoronize the current TS with the PD.
func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
}
func makeSubrangeRegions(keys ...string) constantRegions {
if len(keys) == 0 {
return nil