br: add chaos testing for advancer owner (#58183)
close pingcap/tidb#50458
This commit is contained in:
@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool {
|
||||
return c.task != nil
|
||||
}
|
||||
|
||||
// HasSubscriber returns whether the advancer is associated with a subscriber.
|
||||
func (c *CheckpointAdvancer) HasSubscribion() bool {
|
||||
// HasSubscriptions returns whether the advancer is associated with a subscriber.
|
||||
func (c *CheckpointAdvancer) HasSubscriptions() bool {
|
||||
c.subscriberMu.Lock()
|
||||
defer c.subscriberMu.Unlock()
|
||||
|
||||
@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint {
|
||||
}
|
||||
}
|
||||
|
||||
func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
|
||||
func newCheckpointWithSpan(s spans.Valued) *checkpoint {
|
||||
return &checkpoint{
|
||||
StartKey: s.Key.StartKey,
|
||||
EndKey: s.Key.EndKey,
|
||||
@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
|
||||
f(c.checkpoints)
|
||||
}
|
||||
|
||||
// only used for test
|
||||
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
|
||||
c.checkpoints = cps
|
||||
}
|
||||
|
||||
func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
|
||||
region, err := locateKeyOfRegion(ctx, c.env, startKey)
|
||||
if err != nil {
|
||||
@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
|
||||
}
|
||||
|
||||
func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
|
||||
cp := NewCheckpointWithSpan(s)
|
||||
cp := newCheckpointWithSpan(s)
|
||||
if cp.TS < c.lastCheckpoint.TS {
|
||||
log.Warn("failed to update global checkpoint: stale",
|
||||
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
|
||||
|
||||
@ -474,7 +474,7 @@ func TestRemoveTaskAndFlush(t *testing.T) {
|
||||
}, 10*time.Second, 100*time.Millisecond)
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
|
||||
require.Eventually(t, func() bool {
|
||||
return !adv.HasSubscribion()
|
||||
return !adv.HasSubscriptions()
|
||||
}, 10*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
|
||||
@ -11,18 +11,21 @@ import (
|
||||
const (
|
||||
flagBackoffTime = "backoff-time"
|
||||
flagTickInterval = "tick-interval"
|
||||
flagFullScanDiffTick = "full-scan-tick"
|
||||
flagAdvancingByCache = "advancing-by-cache"
|
||||
flagTryAdvanceThreshold = "try-advance-threshold"
|
||||
flagCheckPointLagLimit = "check-point-lag-limit"
|
||||
|
||||
DefaultConsistencyCheckTick = 5
|
||||
DefaultTryAdvanceThreshold = 4 * time.Minute
|
||||
DefaultCheckPointLagLimit = 48 * time.Hour
|
||||
DefaultBackOffTime = 5 * time.Second
|
||||
DefaultTickInterval = 12 * time.Second
|
||||
DefaultFullScanTick = 4
|
||||
DefaultAdvanceByCache = true
|
||||
// used for chaos testing
|
||||
flagOwnershipCycleInterval = "ownership-cycle-interval"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultTryAdvanceThreshold = 4 * time.Minute
|
||||
DefaultCheckPointLagLimit = 48 * time.Hour
|
||||
DefaultBackOffTime = 5 * time.Second
|
||||
DefaultTickInterval = 12 * time.Second
|
||||
|
||||
// used for chaos testing, default to disable
|
||||
DefaultOwnershipCycleInterval = 0
|
||||
)
|
||||
|
||||
var (
|
||||
@ -38,6 +41,11 @@ type Config struct {
|
||||
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
|
||||
// The maximum lag could be tolerated for the checkpoint lag.
|
||||
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`
|
||||
|
||||
// Following configs are used in chaos testings, better not to enable in prod
|
||||
//
|
||||
// used to periodically becomes/retire advancer owner
|
||||
OwnershipCycleInterval time.Duration `toml:"ownership-cycle-interval" json:"ownership-cycle-interval"`
|
||||
}
|
||||
|
||||
func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
|
||||
@ -49,14 +57,22 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
|
||||
"If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.")
|
||||
f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit,
|
||||
"The maximum lag could be tolerated for the checkpoint lag.")
|
||||
|
||||
// used for chaos testing
|
||||
f.Duration(flagOwnershipCycleInterval, DefaultOwnershipCycleInterval,
|
||||
"The interval that the owner will retire itself")
|
||||
|
||||
// mark hidden
|
||||
_ = f.MarkHidden(flagOwnershipCycleInterval)
|
||||
}
|
||||
|
||||
func Default() Config {
|
||||
return Config{
|
||||
BackoffTime: DefaultBackOffTime,
|
||||
TickDuration: DefaultTickInterval,
|
||||
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
|
||||
CheckPointLagLimit: DefaultCheckPointLagLimit,
|
||||
BackoffTime: DefaultBackOffTime,
|
||||
TickDuration: DefaultTickInterval,
|
||||
TryAdvanceThreshold: DefaultTryAdvanceThreshold,
|
||||
CheckPointLagLimit: DefaultCheckPointLagLimit,
|
||||
OwnershipCycleInterval: DefaultOwnershipCycleInterval,
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,6 +94,10 @@ func (conf *Config) GetFromFlags(f *pflag.FlagSet) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conf.OwnershipCycleInterval, err = f.GetDuration(flagOwnershipCycleInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -102,3 +102,11 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) {
|
||||
}
|
||||
return loop, nil
|
||||
}
|
||||
|
||||
func (od *OwnerDaemon) ForceToBeOwner(ctx context.Context) error {
|
||||
return od.manager.ForceToBeOwner(ctx)
|
||||
}
|
||||
|
||||
func (od *OwnerDaemon) RetireIfOwner() {
|
||||
od.manager.RetireOwner()
|
||||
}
|
||||
|
||||
@ -149,7 +149,6 @@ func TestDaemon(t *testing.T) {
|
||||
ow.RetireOwner()
|
||||
req.False(ow.IsOwner())
|
||||
app.AssertNotRunning(1 * time.Second)
|
||||
ow.CampaignOwner()
|
||||
req.Eventually(func() bool {
|
||||
return ow.IsOwner()
|
||||
}, 1*time.Second, 100*time.Millisecond)
|
||||
|
||||
@ -919,6 +919,8 @@ func RunStreamResume(
|
||||
func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error {
|
||||
ctx, cancel := context.WithCancel(c)
|
||||
defer cancel()
|
||||
log.Info("starting", zap.String("cmd", cmdName))
|
||||
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, false, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
@ -941,10 +943,46 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.AdvancerCfg.OwnershipCycleInterval > 0 {
|
||||
err = advancerd.ForceToBeOwner(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("command line advancer forced to be the owner")
|
||||
go runOwnershipCycle(ctx, advancerd, cfg.AdvancerCfg.OwnershipCycleInterval, true)
|
||||
}
|
||||
loop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// runOwnershipCycle handles the periodic cycling of ownership for the advancer
|
||||
func runOwnershipCycle(ctx context.Context, advancerd *daemon.OwnerDaemon, cycleDuration time.Duration, isOwner bool) {
|
||||
ticker := time.NewTicker(cycleDuration)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if !isOwner {
|
||||
// try to become owner
|
||||
if err := advancerd.ForceToBeOwner(ctx); err != nil {
|
||||
log.Error("command line advancer failed to force ownership", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
log.Info("command line advancer forced to be the owner")
|
||||
isOwner = true
|
||||
} else {
|
||||
// retire from being owner
|
||||
advancerd.RetireIfOwner()
|
||||
log.Info("command line advancer retired from being owner")
|
||||
isOwner = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkConfigForStatus(pd []string) error {
|
||||
if len(pd) == 0 {
|
||||
return errors.Annotatef(berrors.ErrInvalidArgument,
|
||||
|
||||
@ -11,7 +11,6 @@ go_library(
|
||||
importpath = "github.com/pingcap/tidb/pkg/config",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//br/pkg/streamhelper/config",
|
||||
"//pkg/parser/terror",
|
||||
"//pkg/util/logutil",
|
||||
"//pkg/util/tiflashcompute",
|
||||
|
||||
@ -33,7 +33,6 @@ import (
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/pingcap/errors"
|
||||
zaplog "github.com/pingcap/log"
|
||||
logbackupconf "github.com/pingcap/tidb/br/pkg/streamhelper/config"
|
||||
"github.com/pingcap/tidb/pkg/parser/terror"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
|
||||
@ -458,13 +457,6 @@ func (b *AtomicBool) UnmarshalText(text []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogBackup is the config for log backup service.
|
||||
// For now, it includes the embed advancer.
|
||||
type LogBackup struct {
|
||||
Advancer logbackupconf.Config `toml:"advancer" json:"advancer"`
|
||||
Enabled bool `toml:"enabled" json:"enabled"`
|
||||
}
|
||||
|
||||
// Log is the log section of config.
|
||||
type Log struct {
|
||||
// Log level.
|
||||
|
||||
Reference in New Issue
Block a user