From a6cd5e7dc9f5b9336f7dc06f13f83300aebf5810 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Tue, 31 Dec 2024 10:23:03 -0500 Subject: [PATCH] br: add chaos testing for advancer owner (#58183) close pingcap/tidb#50458 --- br/pkg/streamhelper/advancer.go | 13 ++---- br/pkg/streamhelper/advancer_test.go | 2 +- br/pkg/streamhelper/config/advancer_conf.go | 46 +++++++++++++------ br/pkg/streamhelper/daemon/owner_daemon.go | 8 ++++ .../streamhelper/daemon/owner_daemon_test.go | 1 - br/pkg/task/stream.go | 38 +++++++++++++++ pkg/config/BUILD.bazel | 1 - pkg/config/config.go | 8 ---- 8 files changed, 84 insertions(+), 33 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index dab645ddb5..df595109d8 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool { return c.task != nil } -// HasSubscriber returns whether the advancer is associated with a subscriber. -func (c *CheckpointAdvancer) HasSubscribion() bool { +// HasSubscriptions returns whether the advancer is associated with a subscriber. +func (c *CheckpointAdvancer) HasSubscriptions() bool { c.subscriberMu.Lock() defer c.subscriberMu.Unlock() @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint { } } -func NewCheckpointWithSpan(s spans.Valued) *checkpoint { +func newCheckpointWithSpan(s spans.Valued) *checkpoint { return &checkpoint{ StartKey: s.Key.StartKey, EndKey: s.Key.EndKey, @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) { f(c.checkpoints) } -// only used for test -func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) { - c.checkpoints = cps -} - func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string { region, err := locateKeyOfRegion(ctx, c.env, startKey) if err != nil { @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error } func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool { - cp := NewCheckpointWithSpan(s) + cp := newCheckpointWithSpan(s) if cp.TS < c.lastCheckpoint.TS { log.Warn("failed to update global checkpoint: stale", zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS)) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 37aa697e67..6e2ecabe44 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -474,7 +474,7 @@ func TestRemoveTaskAndFlush(t *testing.T) { }, 10*time.Second, 100*time.Millisecond) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop")) require.Eventually(t, func() bool { - return !adv.HasSubscribion() + return !adv.HasSubscriptions() }, 10*time.Second, 100*time.Millisecond) } diff --git a/br/pkg/streamhelper/config/advancer_conf.go b/br/pkg/streamhelper/config/advancer_conf.go index 24b7674138..a2bfd9ac52 100644 --- a/br/pkg/streamhelper/config/advancer_conf.go +++ b/br/pkg/streamhelper/config/advancer_conf.go @@ -11,18 +11,21 @@ import ( const ( flagBackoffTime = "backoff-time" flagTickInterval = "tick-interval" - flagFullScanDiffTick = "full-scan-tick" - flagAdvancingByCache = "advancing-by-cache" flagTryAdvanceThreshold = "try-advance-threshold" flagCheckPointLagLimit = "check-point-lag-limit" - DefaultConsistencyCheckTick = 5 - DefaultTryAdvanceThreshold = 4 * time.Minute - DefaultCheckPointLagLimit = 48 * time.Hour - DefaultBackOffTime = 5 * time.Second - DefaultTickInterval = 12 * time.Second - DefaultFullScanTick = 4 - DefaultAdvanceByCache = true + // used for chaos testing + flagOwnershipCycleInterval = "ownership-cycle-interval" +) + +const ( + DefaultTryAdvanceThreshold = 4 * time.Minute + DefaultCheckPointLagLimit = 48 * time.Hour + DefaultBackOffTime = 5 * time.Second + DefaultTickInterval = 12 * time.Second + + // used for chaos testing, default to disable + DefaultOwnershipCycleInterval = 0 ) var ( @@ -38,6 +41,11 @@ type Config struct { TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"` // The maximum lag could be tolerated for the checkpoint lag. CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"` + + // Following configs are used in chaos testings, better not to enable in prod + // + // used to periodically becomes/retire advancer owner + OwnershipCycleInterval time.Duration `toml:"ownership-cycle-interval" json:"ownership-cycle-interval"` } func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { @@ -49,14 +57,22 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) { "If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.") f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit, "The maximum lag could be tolerated for the checkpoint lag.") + + // used for chaos testing + f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval, + "The interval that the owner will retire itself") + + // mark hidden + _ = f.MarkHidden(flagOwnershipCycleInterval) } func Default() Config { return Config{ - BackoffTime: DefaultBackOffTime, - TickDuration: DefaultTickInterval, - TryAdvanceThreshold: DefaultTryAdvanceThreshold, - CheckPointLagLimit: DefaultCheckPointLagLimit, + BackoffTime: DefaultBackOffTime, + TickDuration: DefaultTickInterval, + TryAdvanceThreshold: DefaultTryAdvanceThreshold, + CheckPointLagLimit: DefaultCheckPointLagLimit, + OwnershipCycleInterval: DefaultOwnershipCycleInterval, } } @@ -78,6 +94,10 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error { if err != nil { return err } + conf.OwnershipCycleInterval, err = f.GetDuration(flagOwnershipCycleInterval) + if err != nil { + return err + } return nil } diff --git a/br/pkg/streamhelper/daemon/owner_daemon.go b/br/pkg/streamhelper/daemon/owner_daemon.go index 5956b643c9..0884004c4a 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon.go +++ b/br/pkg/streamhelper/daemon/owner_daemon.go @@ -102,3 +102,11 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) { } return loop, nil } + +func (od *OwnerDaemon) ForceToBeOwner(ctx context.Context) error { + return od.manager.ForceToBeOwner(ctx) +} + +func (od *OwnerDaemon) RetireIfOwner() { + od.manager.RetireOwner() +} diff --git a/br/pkg/streamhelper/daemon/owner_daemon_test.go b/br/pkg/streamhelper/daemon/owner_daemon_test.go index e7693a4c1d..2ca9b85c78 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon_test.go +++ b/br/pkg/streamhelper/daemon/owner_daemon_test.go @@ -149,7 +149,6 @@ func TestDaemon(t *testing.T) { ow.RetireOwner() req.False(ow.IsOwner()) app.AssertNotRunning(1 * time.Second) - ow.CampaignOwner() req.Eventually(func() bool { return ow.IsOwner() }, 1*time.Second, 100*time.Millisecond) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index d56b9b0213..0c22db685c 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -919,6 +919,8 @@ func RunStreamResume( func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error { ctx, cancel := context.WithCancel(c) defer cancel() + log.Info("starting", zap.String("cmd", cmdName)) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { @@ -941,10 +943,46 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre if err != nil { return err } + if cfg.AdvancerCfg.OwnershipCycleInterval > 0 { + err = advancerd.ForceToBeOwner(ctx) + if err != nil { + return err + } + log.Info("command line advancer forced to be the owner") + go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true) + } loop() return nil } +// runOwnershipCycle handles the periodic cycling of ownership for the advancer +func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) { + ticker := time.NewTicker(cycleDuration) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !isOwner { + // try to become owner + if err := advancerd.ForceToBeOwner(ctx); err != nil { + log.Error("command line advancer failed to force ownership", zap.Error(err)) + continue + } + log.Info("command line advancer forced to be the owner") + isOwner = true + } else { + // retire from being owner + advancerd.RetireIfOwner() + log.Info("command line advancer retired from being owner") + isOwner = false + } + } + } +} + func checkConfigForStatus(pd []string) error { if len(pd) == 0 { return errors.Annotatef(berrors.ErrInvalidArgument, diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index c4387914a9..f3bf88df15 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -11,7 +11,6 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/config", visibility = ["//visibility:public"], deps = [ - "//br/pkg/streamhelper/config", "//pkg/parser/terror", "//pkg/util/logutil", "//pkg/util/tiflashcompute", diff --git a/pkg/config/config.go b/pkg/config/config.go index 3df1ac5cef..ac3bf7927b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -33,7 +33,6 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" zaplog "github.com/pingcap/log" - logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/tiflashcompute" @@ -458,13 +457,6 @@ func (b *AtomicBool) UnmarshalText(text []byte) error { return nil } -// LogBackup is the config for log backup service. -// For now, it includes the embed advancer. -type LogBackup struct { - Advancer logbackupconf.Config `toml:"advancer" json:"advancer"` - Enabled bool `toml:"enabled" json:"enabled"` -} - // Log is the log section of config. type Log struct { // Log level.