diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index d8c3f3f9be..d8cf18d8ff 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -162,6 +162,7 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//errs", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/br/pkg/lightning/backend/local/checksum.go b/br/pkg/lightning/backend/local/checksum.go index 7c7c455146..7510417607 100644 --- a/br/pkg/lightning/backend/local/checksum.go +++ b/br/pkg/lightning/backend/local/checksum.go @@ -294,12 +294,33 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo return nil, err } +var retryGetTSInterval = time.Second + // Checksum implements the ChecksumManager interface. func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) { tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name) - physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) - if err != nil { - return nil, errors.Annotate(err, "fetch tso from pd failed") + var ( + physicalTS, logicalTS int64 + err error + retryTime int + ) + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + for err != nil { + if !pd.IsLeaderChange(err) { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + retryTime++ + if retryTime%60 == 0 { + log.FromContext(ctx).Warn("fetch tso from pd failed and retrying", + zap.Int("retryTime", retryTime), + zap.Error(err)) + } + select { + case <-ctx.Done(): + err = ctx.Err() + case <-time.After(retryGetTSInterval): + physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx) + } } ts := oracle.ComposeTS(physicalTS, logicalTS) if err := e.manager.addOneJob(ctx, tbl, ts); err != nil { diff --git a/br/pkg/lightning/backend/local/checksum_test.go b/br/pkg/lightning/backend/local/checksum_test.go index d003871574..257e59c41d 100644 --- a/br/pkg/lightning/backend/local/checksum_test.go +++ b/br/pkg/lightning/backend/local/checksum_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/errs" "go.uber.org/atomic" ) @@ -197,6 +198,18 @@ func TestDoChecksumWithTikv(t *testing.T) { require.Zero(t, checksumExec.manager.currentTS) require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS)) } + + // test PD leader change error + backup := retryGetTSInterval + retryGetTSInterval = time.Millisecond + t.Cleanup(func() { + retryGetTSInterval = backup + }) + pdClient.leaderChanging = true + kvClient.maxErrCount = 0 + checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient} + _, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo}) + require.NoError(t, err) } func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) { @@ -264,6 +277,7 @@ type testPDClient struct { count atomic.Int32 gcSafePoint []safePointTTL logicalTSCounter atomic.Uint64 + leaderChanging bool } func (c *testPDClient) currentSafePoint() uint64 { @@ -280,6 +294,9 @@ func (c *testPDClient) currentSafePoint() uint64 { func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { physicalTS := time.Now().UnixMilli() + if c.leaderChanging && physicalTS%2 == 0 { + return 0, 0, errs.ErrClientTSOStreamClosed + } logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc()) return physicalTS, logicalTS, nil }