diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 42330e338a..fbdeff3414 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -127,6 +127,9 @@ func newCheckpointWithSpan(s spans.Valued) *checkpoint { } func (c *checkpoint) safeTS() uint64 { + if c.TS == 0 { + return 0 + } return c.TS - 1 } @@ -429,7 +432,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error } log.Info("get global checkpoint", zap.Uint64("checkpoint", globalCheckpointTs)) c.lastCheckpoint = newCheckpointWithTS(globalCheckpointTs) - p, err := c.env.BlockGCUntil(ctx, globalCheckpointTs-1) + p, err := c.env.BlockGCUntil(ctx, c.lastCheckpoint.safeTS()) if err != nil { log.Warn("failed to upload service GC safepoint, skipping.", logutil.ShortError(err)) } diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index c19bc503af..ea389d2724 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -677,7 +677,7 @@ func newTestEnv(c *fakeCluster, t *testing.T) *testEnv { Name: "whole", Info: &backup.StreamBackupTaskInfo{ Name: "whole", - StartTs: 5, + StartTs: 0, }, Ranges: rngs, } @@ -782,7 +782,7 @@ func (t *testEnv) putTask() { Name: "whole", Info: &backup.StreamBackupTaskInfo{ Name: "whole", - StartTs: 5, + StartTs: 0, }, Ranges: rngs, }