lightning: retry for leader change error when GetTS (#44478)
close pingcap/tidb#44263
This commit is contained in:
@ -162,6 +162,7 @@ go_test(
|
||||
"@com_github_stretchr_testify//require",
|
||||
"@com_github_tikv_client_go_v2//oracle",
|
||||
"@com_github_tikv_pd_client//:client",
|
||||
"@com_github_tikv_pd_client//errs",
|
||||
"@org_golang_google_grpc//:grpc",
|
||||
"@org_golang_google_grpc//codes",
|
||||
"@org_golang_google_grpc//status",
|
||||
|
||||
@ -294,12 +294,33 @@ func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var retryGetTSInterval = time.Second
|
||||
|
||||
// Checksum implements the ChecksumManager interface.
|
||||
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
|
||||
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
|
||||
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Annotate(err, "fetch tso from pd failed")
|
||||
var (
|
||||
physicalTS, logicalTS int64
|
||||
err error
|
||||
retryTime int
|
||||
)
|
||||
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
|
||||
for err != nil {
|
||||
if !pd.IsLeaderChange(err) {
|
||||
return nil, errors.Annotate(err, "fetch tso from pd failed")
|
||||
}
|
||||
retryTime++
|
||||
if retryTime%60 == 0 {
|
||||
log.FromContext(ctx).Warn("fetch tso from pd failed and retrying",
|
||||
zap.Int("retryTime", retryTime),
|
||||
zap.Error(err))
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
case <-time.After(retryGetTSInterval):
|
||||
physicalTS, logicalTS, err = e.manager.pdClient.GetTS(ctx)
|
||||
}
|
||||
}
|
||||
ts := oracle.ComposeTS(physicalTS, logicalTS)
|
||||
if err := e.manager.addOneJob(ctx, tbl, ts); err != nil {
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
pd "github.com/tikv/pd/client"
|
||||
"github.com/tikv/pd/client/errs"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
@ -197,6 +198,18 @@ func TestDoChecksumWithTikv(t *testing.T) {
|
||||
require.Zero(t, checksumExec.manager.currentTS)
|
||||
require.Equal(t, 0, len(checksumExec.manager.tableGCSafeTS))
|
||||
}
|
||||
|
||||
// test PD leader change error
|
||||
backup := retryGetTSInterval
|
||||
retryGetTSInterval = time.Millisecond
|
||||
t.Cleanup(func() {
|
||||
retryGetTSInterval = backup
|
||||
})
|
||||
pdClient.leaderChanging = true
|
||||
kvClient.maxErrCount = 0
|
||||
checksumExec := &TiKVChecksumManager{manager: newGCTTLManager(pdClient), client: kvClient}
|
||||
_, err := checksumExec.Checksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDoChecksumWithErrorAndLongOriginalLifetime(t *testing.T) {
|
||||
@ -264,6 +277,7 @@ type testPDClient struct {
|
||||
count atomic.Int32
|
||||
gcSafePoint []safePointTTL
|
||||
logicalTSCounter atomic.Uint64
|
||||
leaderChanging bool
|
||||
}
|
||||
|
||||
func (c *testPDClient) currentSafePoint() uint64 {
|
||||
@ -280,6 +294,9 @@ func (c *testPDClient) currentSafePoint() uint64 {
|
||||
|
||||
func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
|
||||
physicalTS := time.Now().UnixMilli()
|
||||
if c.leaderChanging && physicalTS%2 == 0 {
|
||||
return 0, 0, errs.ErrClientTSOStreamClosed
|
||||
}
|
||||
logicalTS := oracle.ExtractLogical(c.logicalTSCounter.Inc())
|
||||
return physicalTS, logicalTS, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user