991 lines
30 KiB
Go
991 lines
30 KiB
Go
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package backup_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"math/rand"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/pingcap/failpoint"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/encryptionpb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/tidb/br/pkg/backup"
|
|
"github.com/pingcap/tidb/br/pkg/conn"
|
|
gluemock "github.com/pingcap/tidb/br/pkg/gluetidb/mock"
|
|
"github.com/pingcap/tidb/br/pkg/metautil"
|
|
"github.com/pingcap/tidb/br/pkg/mock"
|
|
"github.com/pingcap/tidb/br/pkg/pdutil"
|
|
"github.com/pingcap/tidb/br/pkg/rtree"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/testutils"
|
|
pd "github.com/tikv/pd/client"
|
|
"go.opencensus.io/stats/view"
|
|
)
|
|
|
|
type testBackup struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
mockPDClient pd.Client
|
|
mockCluster *testutils.MockCluster
|
|
mockGlue *gluemock.MockGlue
|
|
backupClient *backup.Client
|
|
|
|
cluster *mock.Cluster
|
|
storage storeapi.Storage
|
|
}
|
|
|
|
// locks used to solve race when mock then behaviour when store drops
|
|
var lock sync.Mutex
|
|
|
|
// check the connect behaviour in test.
|
|
var connectedStore map[uint64]int
|
|
|
|
var _ backup.BackupSender = (*mockBackupBackupSender)(nil)
|
|
|
|
func mockGetBackupClientCallBack(ctx context.Context, storeID uint64, reset bool) (backuppb.BackupClient, error) {
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
connectedStore[storeID] += 1
|
|
// we don't need connect real tikv in unit test
|
|
// and we have already mock the backup response in `SendAsync`
|
|
// so just return nil here
|
|
return nil, nil
|
|
}
|
|
|
|
type mockBackupBackupSender struct {
|
|
backupResponses map[uint64][]*backup.ResponseAndStore
|
|
}
|
|
|
|
func (m *mockBackupBackupSender) SendAsync(
|
|
ctx context.Context,
|
|
round uint64,
|
|
storeID uint64,
|
|
limiter *backup.ResourceConcurrentLimiter,
|
|
request backuppb.BackupRequest,
|
|
concurrency uint,
|
|
cli backuppb.BackupClient,
|
|
respCh chan *backup.ResponseAndStore,
|
|
StateNotifier chan backup.BackupRetryPolicy,
|
|
) {
|
|
go func() {
|
|
defer func() {
|
|
close(respCh)
|
|
}()
|
|
lock.Lock()
|
|
resps := m.backupResponses[storeID]
|
|
lock.Unlock()
|
|
for len(resps) == 0 {
|
|
// store has no response
|
|
// block for a while and try again.
|
|
// let this goroutine never return.
|
|
time.Sleep(100 * time.Millisecond)
|
|
lock.Lock()
|
|
resps = m.backupResponses[storeID]
|
|
lock.Unlock()
|
|
}
|
|
for _, r := range resps {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case respCh <- r:
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func createBackupSuite(t *testing.T) *testBackup {
|
|
tikvClient, mockCluster, pdClient, err := testutils.NewMockTiKV("", nil)
|
|
require.NoError(t, err)
|
|
s := new(testBackup)
|
|
s.mockGlue = &gluemock.MockGlue{}
|
|
s.mockPDClient = pdClient
|
|
s.mockCluster = mockCluster
|
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
|
mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}}
|
|
mockMgr.SetPDClient(s.mockPDClient)
|
|
s.backupClient = backup.NewBackupClient(s.ctx, mockMgr)
|
|
|
|
s.cluster, err = mock.NewCluster()
|
|
require.NoError(t, err)
|
|
base := t.TempDir()
|
|
s.storage, err = objstore.NewLocalStorage(base)
|
|
require.NoError(t, err)
|
|
require.NoError(t, s.cluster.Start())
|
|
|
|
t.Cleanup(func() {
|
|
mockMgr.Close()
|
|
s.cluster.Stop()
|
|
tikvClient.Close()
|
|
pdClient.Close()
|
|
view.Stop()
|
|
})
|
|
return s
|
|
}
|
|
|
|
func TestGetTS(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
|
|
// mockPDClient' physical ts and current ts will have deviation
|
|
// so make this deviation tolerance 100ms
|
|
deviation := 100
|
|
|
|
// timeago not work
|
|
expectedDuration := 0
|
|
currentTS := time.Now().UnixMilli()
|
|
ts, err := s.backupClient.GetTS(s.ctx, 0, 0)
|
|
require.NoError(t, err)
|
|
pdTS := oracle.ExtractPhysical(ts)
|
|
duration := int(currentTS - pdTS)
|
|
require.Greater(t, duration, expectedDuration-deviation)
|
|
require.Less(t, duration, expectedDuration+deviation)
|
|
|
|
// timeago = "1.5m"
|
|
expectedDuration = 90000
|
|
currentTS = time.Now().UnixMilli()
|
|
ts, err = s.backupClient.GetTS(s.ctx, 90*time.Second, 0)
|
|
require.NoError(t, err)
|
|
pdTS = oracle.ExtractPhysical(ts)
|
|
duration = int(currentTS - pdTS)
|
|
require.Greater(t, duration, expectedDuration-deviation)
|
|
require.Less(t, duration, expectedDuration+deviation)
|
|
|
|
// timeago = "-1m"
|
|
_, err = s.backupClient.GetTS(s.ctx, -time.Minute, 0)
|
|
require.Error(t, err)
|
|
require.Regexp(t, "negative timeago is not allowed.*", err.Error())
|
|
|
|
// timeago = "1000000h" overflows
|
|
_, err = s.backupClient.GetTS(s.ctx, 1000000*time.Hour, 0)
|
|
require.Error(t, err)
|
|
require.Regexp(t, ".*backup ts overflow.*", err.Error())
|
|
|
|
// timeago = "10h" exceed GCSafePoint
|
|
p, l, err := s.mockPDClient.GetTS(s.ctx)
|
|
require.NoError(t, err)
|
|
now := oracle.ComposeTS(p, l)
|
|
_, err = s.mockPDClient.UpdateGCSafePoint(s.ctx, now)
|
|
require.NoError(t, err)
|
|
_, err = s.backupClient.GetTS(s.ctx, 10*time.Hour, 0)
|
|
require.Error(t, err)
|
|
require.Regexp(t, ".*GC safepoint [0-9]+ exceed TS [0-9]+.*", err.Error())
|
|
|
|
// timeago and backupts both exists, use backupts
|
|
backupts := oracle.ComposeTS(p+10, l)
|
|
ts, err = s.backupClient.GetTS(s.ctx, time.Minute, backupts)
|
|
require.NoError(t, err)
|
|
require.Equal(t, backupts, ts)
|
|
}
|
|
|
|
func TestGetHistoryDDLJobs(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
|
|
tk := testkit.NewTestKit(t, s.cluster.Storage)
|
|
lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get last ts: %s", err)
|
|
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
|
|
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
|
|
lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get last ts: %s", err)
|
|
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
|
|
tk.MustExec("DROP TABLE test_db.test_table1;")
|
|
tk.MustExec("DROP DATABASE test_db;")
|
|
tk.MustExec("CREATE DATABASE test_db;")
|
|
tk.MustExec("USE test_db;")
|
|
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
|
|
tk.MustExec("RENAME TABLE test_table1 to test_table;")
|
|
tk.MustExec("RENAME TABLE test_table to test_table2;")
|
|
tk.MustExec("RENAME TABLE test_table2 to test_table;")
|
|
lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get last ts: %s", err)
|
|
tk.MustExec("TRUNCATE TABLE test_table;")
|
|
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get last ts: %s", err)
|
|
|
|
checkFn := func(lastTS uint64, ts uint64, jobsCount int) {
|
|
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
|
|
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
|
|
ctx := context.Background()
|
|
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
|
|
s.mockGlue.SetSession(tk.Session())
|
|
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
|
|
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
|
|
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
|
|
require.NoError(t, err, "Flush failed", err)
|
|
err = metaWriter.FlushBackupMeta(ctx)
|
|
require.NoError(t, err, "Finally flush backup meta failed", err)
|
|
|
|
metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
|
|
require.NoError(t, err)
|
|
mockMeta := &backuppb.BackupMeta{}
|
|
err = proto.Unmarshal(metaBytes, mockMeta)
|
|
require.NoError(t, err)
|
|
// check the schema version
|
|
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
|
|
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
|
|
require.NoError(t, err)
|
|
var allDDLJobs []*model.Job
|
|
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
|
|
require.NoError(t, err)
|
|
require.Len(t, allDDLJobs, jobsCount)
|
|
}
|
|
|
|
checkFn(lastTS1, ts, 11)
|
|
checkFn(lastTS2, ts, 9)
|
|
checkFn(lastTS1, lastTS2, 2)
|
|
checkFn(lastTS3, ts, 1)
|
|
}
|
|
|
|
func TestSkipUnsupportedDDLJob(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
|
|
tk := testkit.NewTestKit(t, s.cluster.Storage)
|
|
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
|
|
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
|
|
lastTS, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get last ts: %s", err)
|
|
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
|
|
tk.MustExec("DROP TABLE test_db.test_table1;")
|
|
tk.MustExec("DROP DATABASE test_db;")
|
|
tk.MustExec("CREATE DATABASE test_db;")
|
|
tk.MustExec("USE test_db;")
|
|
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
|
|
tk.MustExec("RENAME TABLE test_table1 to test_table;")
|
|
tk.MustExec("TRUNCATE TABLE test_table;")
|
|
|
|
tk.MustExec("CREATE TABLE tb(id INT NOT NULL, stu_id INT NOT NULL) " +
|
|
"PARTITION BY RANGE (stu_id) (PARTITION p0 VALUES LESS THAN (6),PARTITION p1 VALUES LESS THAN (11))")
|
|
tk.MustExec("ALTER TABLE tb attributes \"merge_option=allow\"")
|
|
tk.MustExec("ALTER TABLE tb PARTITION p0 attributes \"merge_option=deny\"")
|
|
|
|
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
|
|
require.NoErrorf(t, err, "Error get ts: %s", err)
|
|
|
|
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
|
|
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
|
|
ctx := context.Background()
|
|
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
|
|
s.mockGlue.SetSession(tk.Session())
|
|
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
|
|
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
|
|
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
|
|
require.NoError(t, err, "Flush failed", err)
|
|
err = metaWriter.FlushBackupMeta(ctx)
|
|
require.NoError(t, err, "Finally flush backup meta failed", err)
|
|
|
|
metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
|
|
require.NoError(t, err)
|
|
mockMeta := &backuppb.BackupMeta{}
|
|
err = proto.Unmarshal(metaBytes, mockMeta)
|
|
require.NoError(t, err)
|
|
// check the schema version
|
|
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
|
|
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
|
|
require.NoError(t, err)
|
|
var allDDLJobs []*model.Job
|
|
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
|
|
require.NoError(t, err)
|
|
require.Len(t, allDDLJobs, 8)
|
|
}
|
|
|
|
func TestCheckBackupIsLocked(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
|
|
ctx := context.Background()
|
|
|
|
// check passed with an empty storage
|
|
err := backup.CheckBackupStorageIsLocked(ctx, s.storage)
|
|
require.NoError(t, err)
|
|
|
|
// check passed with only a lock file
|
|
err = s.storage.WriteFile(ctx, metautil.LockFile, nil)
|
|
require.NoError(t, err)
|
|
err = backup.CheckBackupStorageIsLocked(ctx, s.storage)
|
|
require.NoError(t, err)
|
|
|
|
// check passed with a lock file and other non-sst files.
|
|
err = s.storage.WriteFile(ctx, "1.txt", nil)
|
|
require.NoError(t, err)
|
|
err = backup.CheckBackupStorageIsLocked(ctx, s.storage)
|
|
require.NoError(t, err)
|
|
|
|
// check failed
|
|
err = s.storage.WriteFile(ctx, "1.sst", nil)
|
|
require.NoError(t, err)
|
|
err = backup.CheckBackupStorageIsLocked(ctx, s.storage)
|
|
require.Error(t, err)
|
|
require.Regexp(t, "backup lock file and sst file exist in(.+)", err.Error())
|
|
}
|
|
|
|
func TestOnBackupResponse(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
|
|
ctx := context.Background()
|
|
|
|
buildProgressRangeFn := func(startKey []byte, endKey []byte) *rtree.ProgressRange {
|
|
return &rtree.ProgressRange{
|
|
Res: rtree.NewRangeTree(),
|
|
Origin: rtree.KeyRange{
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
},
|
|
}
|
|
}
|
|
|
|
errContext := utils.NewErrorContext("test", 1)
|
|
lock, err := s.backupClient.OnBackupResponse(ctx, nil, errContext, nil)
|
|
require.NoError(t, err)
|
|
require.Nil(t, lock)
|
|
|
|
tree := rtree.NewProgressRangeTree(nil, false)
|
|
r := &backup.ResponseAndStore{
|
|
StoreID: 0,
|
|
Resp: &backuppb.BackupResponse{
|
|
Error: &backuppb.Error{
|
|
Msg: "test",
|
|
},
|
|
},
|
|
}
|
|
// case #1: error resposne
|
|
// first error can be ignored due to errContext.
|
|
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.NoError(t, err)
|
|
require.Nil(t, lock)
|
|
// second error cannot be ignored.
|
|
_, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.Error(t, err)
|
|
|
|
// case #2: normal resposne
|
|
r = &backup.ResponseAndStore{
|
|
StoreID: 0,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: []byte("a"),
|
|
EndKey: []byte("b"),
|
|
},
|
|
}
|
|
|
|
require.NoError(t, tree.Insert(buildProgressRangeFn([]byte("aa"), []byte("c"))))
|
|
// error due to the tree range does not match response range.
|
|
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.Nil(t, lock)
|
|
require.NoError(t, err)
|
|
|
|
// case #3: partial range success case, find incomplete range
|
|
r.Resp.StartKey = []byte("aa")
|
|
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.NoError(t, err)
|
|
require.Nil(t, lock)
|
|
|
|
incomplete, err := tree.GetIncompleteRanges()
|
|
require.NoError(t, err)
|
|
require.Len(t, incomplete, 1)
|
|
require.Equal(t, []byte("b"), incomplete[0].StartKey)
|
|
require.Equal(t, []byte("c"), incomplete[0].EndKey)
|
|
|
|
// case #4: success case, make up incomplete range
|
|
r.Resp.StartKey = []byte("b")
|
|
r.Resp.EndKey = []byte("c")
|
|
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.NoError(t, err)
|
|
require.Nil(t, lock)
|
|
incomplete, err = tree.GetIncompleteRanges()
|
|
require.NoError(t, err)
|
|
require.Len(t, incomplete, 0)
|
|
|
|
// case #5: failed case, key is locked
|
|
r = &backup.ResponseAndStore{
|
|
StoreID: 0,
|
|
Resp: &backuppb.BackupResponse{
|
|
Error: &backuppb.Error{
|
|
Detail: &backuppb.Error_KvError{
|
|
KvError: &kvrpcpb.KeyError{
|
|
Locked: &kvrpcpb.LockInfo{
|
|
PrimaryLock: []byte("b"),
|
|
LockVersion: 0,
|
|
Key: []byte("b"),
|
|
LockTtl: 50,
|
|
TxnSize: 1,
|
|
LockType: kvrpcpb.Op_Put,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
lock, err = s.backupClient.OnBackupResponse(ctx, r, errContext, &tree)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("b"), lock.Primary)
|
|
}
|
|
|
|
func TestMainBackupLoop(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
backgroundCtx := context.Background()
|
|
|
|
// test 2 stores backup
|
|
s.mockCluster.AddStore(1, "127.0.0.1:20160")
|
|
s.mockCluster.AddStore(2, "127.0.0.1:20161")
|
|
|
|
stores, err := s.mockPDClient.GetAllStores(backgroundCtx)
|
|
require.NoError(t, err)
|
|
require.Len(t, stores, 2)
|
|
|
|
// random generate bytes in range [a, b)
|
|
genRandBytesFn := func(a, b []byte) ([]byte, error) {
|
|
n := len(a)
|
|
result := make([]byte, n)
|
|
for {
|
|
_, err := rand.Read(result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if bytes.Compare(result, a) > 0 && bytes.Compare(result, b) < 0 {
|
|
return result, nil
|
|
}
|
|
}
|
|
}
|
|
// split each range into limit parts
|
|
splitRangesFn := func(ranges []rtree.KeyRange, limit int) [][]byte {
|
|
if len(ranges) == 0 {
|
|
return nil
|
|
}
|
|
res := make([][]byte, 0)
|
|
res = append(res, ranges[0].StartKey)
|
|
for i := range ranges {
|
|
partRes := make([][]byte, 0)
|
|
for range limit {
|
|
x, err := genRandBytesFn(ranges[i].StartKey, ranges[i].EndKey)
|
|
require.NoError(t, err)
|
|
partRes = append(partRes, x)
|
|
}
|
|
sort.Slice(partRes, func(i, j int) bool {
|
|
return bytes.Compare(partRes[i], partRes[j]) < 0
|
|
})
|
|
res = append(res, partRes...)
|
|
}
|
|
res = append(res, ranges[len(ranges)-1].EndKey)
|
|
return res
|
|
}
|
|
|
|
// Case #1: normal case
|
|
ranges := []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aaa"),
|
|
EndKey: []byte("zzz"),
|
|
},
|
|
}
|
|
tree, err := s.backupClient.BuildProgressRangeTree(backgroundCtx, ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
mockBackupResponses := make(map[uint64][]*backup.ResponseAndStore)
|
|
splitKeys := splitRangesFn(ranges, 10)
|
|
for i := range len(splitKeys) - 1 {
|
|
randStoreID := uint64(rand.Int()%len(stores) + 1)
|
|
mockBackupResponses[randStoreID] = append(mockBackupResponses[randStoreID], &backup.ResponseAndStore{
|
|
StoreID: randStoreID,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
},
|
|
})
|
|
}
|
|
ch := make(chan backup.BackupRetryPolicy)
|
|
mainLoop := &backup.MainBackupLoop{
|
|
BackupSender: &mockBackupBackupSender{
|
|
backupResponses: mockBackupResponses,
|
|
},
|
|
|
|
BackupReq: backuppb.BackupRequest{},
|
|
GlobalProgressTree: &tree,
|
|
ReplicaReadLabel: nil,
|
|
StateNotifier: ch,
|
|
ProgressCallBack: func(backup.ProgressUnit) {},
|
|
GetBackupClientCallBack: mockGetBackupClientCallBack,
|
|
}
|
|
|
|
// an offline store still can be backed up.
|
|
s.mockCluster.StopStore(1)
|
|
connectedStore = make(map[uint64]int)
|
|
require.NoError(t, s.backupClient.RunLoop(backgroundCtx, mainLoop))
|
|
|
|
// Case #2: canceled case
|
|
ranges = []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aaa"),
|
|
EndKey: []byte("zzz"),
|
|
},
|
|
}
|
|
tree, err = s.backupClient.BuildProgressRangeTree(backgroundCtx, ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
clear(mockBackupResponses)
|
|
splitKeys = splitRangesFn(ranges, 10)
|
|
// range is not complete
|
|
for i := range len(splitKeys) - 2 {
|
|
randStoreID := uint64(rand.Int()%len(stores) + 1)
|
|
mockBackupResponses[randStoreID] = append(mockBackupResponses[randStoreID], &backup.ResponseAndStore{
|
|
StoreID: randStoreID,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
},
|
|
})
|
|
}
|
|
mainLoop = &backup.MainBackupLoop{
|
|
BackupSender: &mockBackupBackupSender{
|
|
backupResponses: mockBackupResponses,
|
|
},
|
|
|
|
BackupReq: backuppb.BackupRequest{},
|
|
GlobalProgressTree: &tree,
|
|
ReplicaReadLabel: nil,
|
|
StateNotifier: ch,
|
|
ProgressCallBack: func(backup.ProgressUnit) {},
|
|
GetBackupClientCallBack: mockGetBackupClientCallBack,
|
|
}
|
|
|
|
// cancel the backup in another goroutine
|
|
ctx, cancel := context.WithCancel(backgroundCtx)
|
|
go func() {
|
|
time.Sleep(100 * time.Millisecond)
|
|
cancel()
|
|
}()
|
|
connectedStore = make(map[uint64]int)
|
|
require.Error(t, s.backupClient.RunLoop(ctx, mainLoop))
|
|
|
|
// Case #3: one store drops and never come back
|
|
ranges = []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aaa"),
|
|
EndKey: []byte("zzz"),
|
|
},
|
|
}
|
|
tree, err = s.backupClient.BuildProgressRangeTree(backgroundCtx, ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
clear(mockBackupResponses)
|
|
splitKeys = splitRangesFn(ranges, 10)
|
|
for i := range len(splitKeys) - 1 {
|
|
randStoreID := uint64(rand.Int()%len(stores) + 1)
|
|
mockBackupResponses[randStoreID] = append(mockBackupResponses[randStoreID], &backup.ResponseAndStore{
|
|
StoreID: randStoreID,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
},
|
|
})
|
|
}
|
|
remainStoreID := uint64(1)
|
|
dropStoreID := uint64(2)
|
|
s.mockCluster.MarkTombstone(dropStoreID)
|
|
dropBackupResponses := mockBackupResponses[dropStoreID]
|
|
lock.Lock()
|
|
mockBackupResponses[dropStoreID] = nil
|
|
lock.Unlock()
|
|
|
|
mainLoop = &backup.MainBackupLoop{
|
|
BackupSender: &mockBackupBackupSender{
|
|
backupResponses: mockBackupResponses,
|
|
},
|
|
|
|
BackupReq: backuppb.BackupRequest{},
|
|
GlobalProgressTree: &tree,
|
|
ReplicaReadLabel: nil,
|
|
StateNotifier: ch,
|
|
ProgressCallBack: func(backup.ProgressUnit) {},
|
|
GetBackupClientCallBack: mockGetBackupClientCallBack,
|
|
}
|
|
go func() {
|
|
// mock region leader balance behaviour.
|
|
time.Sleep(500 * time.Millisecond)
|
|
lock.Lock()
|
|
// store 1 has the range after some while.
|
|
mockBackupResponses[remainStoreID] = append(mockBackupResponses[remainStoreID], dropBackupResponses...)
|
|
lock.Unlock()
|
|
}()
|
|
|
|
connectedStore = make(map[uint64]int)
|
|
// backup can finished until store 1 has response the full ranges.
|
|
require.NoError(t, s.backupClient.RunLoop(backgroundCtx, mainLoop))
|
|
// connect to store 1 more than 1 time
|
|
require.Less(t, 1, connectedStore[remainStoreID])
|
|
// never connect to a dropped store.
|
|
require.Equal(t, 0, connectedStore[dropStoreID])
|
|
|
|
// Case #4 one store drops and come back soon
|
|
ranges = []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aaa"),
|
|
EndKey: []byte("zzz"),
|
|
},
|
|
}
|
|
tree, err = s.backupClient.BuildProgressRangeTree(backgroundCtx, ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
clear(mockBackupResponses)
|
|
splitKeys = splitRangesFn(ranges, 10)
|
|
for i := range len(splitKeys) - 1 {
|
|
randStoreID := uint64(rand.Int()%len(stores) + 1)
|
|
mockBackupResponses[randStoreID] = append(mockBackupResponses[randStoreID], &backup.ResponseAndStore{
|
|
StoreID: randStoreID,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
},
|
|
})
|
|
}
|
|
remainStoreID = uint64(1)
|
|
dropStoreID = uint64(2)
|
|
s.mockCluster.MarkTombstone(dropStoreID)
|
|
|
|
mainLoop = &backup.MainBackupLoop{
|
|
BackupSender: &mockBackupBackupSender{
|
|
backupResponses: mockBackupResponses,
|
|
},
|
|
|
|
BackupReq: backuppb.BackupRequest{},
|
|
GlobalProgressTree: &tree,
|
|
ReplicaReadLabel: nil,
|
|
StateNotifier: ch,
|
|
ProgressCallBack: func(backup.ProgressUnit) {},
|
|
GetBackupClientCallBack: mockGetBackupClientCallBack,
|
|
}
|
|
go func() {
|
|
time.Sleep(500 * time.Millisecond)
|
|
lock.Lock()
|
|
s.mockCluster.StartStore(dropStoreID)
|
|
lock.Unlock()
|
|
}()
|
|
|
|
connectedStore = make(map[uint64]int)
|
|
// backup can finished until store 1 has response the full ranges.
|
|
require.NoError(t, s.backupClient.RunLoop(backgroundCtx, mainLoop))
|
|
// connect to store 1 more than 1 time
|
|
require.Less(t, 1, connectedStore[remainStoreID])
|
|
// connect to store 2 once should finished the backup.
|
|
require.Equal(t, 1, connectedStore[dropStoreID])
|
|
|
|
// Case #5 one store drops and watch store back
|
|
ranges = []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aaa"),
|
|
EndKey: []byte("zzz"),
|
|
},
|
|
}
|
|
tree, err = s.backupClient.BuildProgressRangeTree(backgroundCtx, ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
clear(mockBackupResponses)
|
|
splitKeys = splitRangesFn(ranges, 10)
|
|
for i := range len(splitKeys) - 1 {
|
|
randStoreID := uint64(rand.Int()%len(stores) + 1)
|
|
mockBackupResponses[randStoreID] = append(mockBackupResponses[randStoreID], &backup.ResponseAndStore{
|
|
StoreID: randStoreID,
|
|
Resp: &backuppb.BackupResponse{
|
|
StartKey: splitKeys[i],
|
|
EndKey: splitKeys[i+1],
|
|
},
|
|
})
|
|
}
|
|
remainStoreID = uint64(1)
|
|
dropStoreID = uint64(2)
|
|
// make store 2 never no response.
|
|
dropBackupResponses = mockBackupResponses[dropStoreID]
|
|
lock.Lock()
|
|
mockBackupResponses[dropStoreID] = nil
|
|
lock.Unlock()
|
|
|
|
mainLoop = &backup.MainBackupLoop{
|
|
BackupSender: &mockBackupBackupSender{
|
|
backupResponses: mockBackupResponses,
|
|
},
|
|
|
|
BackupReq: backuppb.BackupRequest{},
|
|
GlobalProgressTree: &tree,
|
|
ReplicaReadLabel: nil,
|
|
StateNotifier: ch,
|
|
ProgressCallBack: func(backup.ProgressUnit) {},
|
|
GetBackupClientCallBack: mockGetBackupClientCallBack,
|
|
}
|
|
go func() {
|
|
time.Sleep(500 * time.Millisecond)
|
|
lock.Lock()
|
|
mockBackupResponses[dropStoreID] = dropBackupResponses
|
|
lock.Unlock()
|
|
}()
|
|
|
|
connectedStore = make(map[uint64]int)
|
|
// backup can finished until store 1 has response the full ranges.
|
|
require.NoError(t, s.backupClient.RunLoop(backgroundCtx, mainLoop))
|
|
// connect to store 1 time, and then handle loop blocked until watch store 2 back.
|
|
require.Equal(t, 1, connectedStore[remainStoreID])
|
|
// connect to store 2 once should finished the backup.
|
|
require.Equal(t, 1, connectedStore[dropStoreID])
|
|
}
|
|
|
|
func TestBuildProgressRangeTree(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
ranges := []rtree.KeyRange{
|
|
{
|
|
StartKey: []byte("aa"),
|
|
EndKey: []byte("b"),
|
|
},
|
|
{
|
|
StartKey: []byte("c"),
|
|
EndKey: []byte("d"),
|
|
},
|
|
{
|
|
StartKey: []byte("f"),
|
|
EndKey: []byte("g"),
|
|
},
|
|
}
|
|
tree, err := s.backupClient.BuildProgressRangeTree(context.Background(), ranges, nil, func(backup.ProgressUnit) {})
|
|
require.NoError(t, err)
|
|
|
|
contained, err := tree.FindContained([]byte("a"), []byte("aa"))
|
|
require.Nil(t, contained)
|
|
require.NoError(t, err)
|
|
|
|
contained, err = tree.FindContained([]byte("b"), []byte("ba"))
|
|
require.Nil(t, contained)
|
|
require.NoError(t, err)
|
|
|
|
contained, err = tree.FindContained([]byte("e"), []byte("ea"))
|
|
require.Nil(t, contained)
|
|
require.NoError(t, err)
|
|
|
|
contained, err = tree.FindContained([]byte("aa"), []byte("b"))
|
|
require.NotNil(t, contained)
|
|
require.Equal(t, []byte("aa"), contained.Origin.StartKey)
|
|
require.Equal(t, []byte("b"), contained.Origin.EndKey)
|
|
require.NoError(t, err)
|
|
|
|
contained, err = tree.FindContained([]byte("cc"), []byte("e"))
|
|
require.Nil(t, contained)
|
|
require.Error(t, err)
|
|
|
|
contained, err = tree.FindContained([]byte("e"), []byte("ff"))
|
|
require.Nil(t, contained)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestObserveStoreChangesAsync(t *testing.T) {
|
|
s := createBackupSuite(t)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// test 2 stores backup
|
|
s.mockCluster.AddStore(1, "127.0.0.1:20160")
|
|
s.mockCluster.AddStore(2, "127.0.0.1:20161")
|
|
|
|
stores, err := s.mockPDClient.GetAllStores(ctx)
|
|
require.NoError(t, err)
|
|
require.Len(t, stores, 2)
|
|
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/backup/backup-store-change-tick", `return(true)`))
|
|
|
|
// case #1: nothing happened
|
|
ch := make(chan backup.BackupRetryPolicy, 1)
|
|
backup.ObserveStoreChangesAsync(ctx, ch, s.mockPDClient)
|
|
// the channel never receive any message
|
|
require.Never(t, func() bool { return len(ch) > 0 }, time.Second, 100*time.Millisecond)
|
|
|
|
// case #2: new store joined
|
|
s.mockCluster.AddStore(3, "127.0.0.1:20162")
|
|
// the channel received the new store
|
|
require.Eventually(t, func() bool {
|
|
if len(ch) == 0 {
|
|
return false
|
|
}
|
|
res := <-ch
|
|
return res.One == 3
|
|
}, time.Second, 100*time.Millisecond)
|
|
|
|
// case #3: one store disconnected
|
|
s.mockCluster.StopStore(2)
|
|
// the channel received the store disconnected
|
|
require.Eventually(t, func() bool {
|
|
if len(ch) == 0 {
|
|
return false
|
|
}
|
|
res := <-ch
|
|
return res.All
|
|
}, time.Second, 100*time.Millisecond)
|
|
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/backup/backup-store-change-tick"))
|
|
}
|
|
|
|
func genSubRanges(req *backuppb.BackupRequest, count int) {
|
|
for i := range count {
|
|
req.SubRanges = append(req.SubRanges, &kvrpcpb.KeyRange{
|
|
StartKey: []byte{byte(i)},
|
|
EndKey: []byte{byte(i + 1)},
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestSplitBackupReqRanges(t *testing.T) {
|
|
req := backuppb.BackupRequest{
|
|
SubRanges: []*kvrpcpb.KeyRange{},
|
|
}
|
|
|
|
// case #1 empty ranges
|
|
res := backup.SplitBackupReqRanges(req, 1)
|
|
require.Len(t, res, 1)
|
|
// case #2 empty ranges and limit is 0
|
|
res = backup.SplitBackupReqRanges(req, 0)
|
|
require.Len(t, res, 1)
|
|
|
|
genSubRanges(&req, 10)
|
|
// case #3: 10 subranges and split into 10 parts
|
|
res = backup.SplitBackupReqRanges(req, 10)
|
|
require.Len(t, res, 10)
|
|
for i := range 10 {
|
|
require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
|
|
require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
|
|
}
|
|
|
|
// case #3.1: 10 subranges and split into 11 parts(has no difference with 10 parts)
|
|
res = backup.SplitBackupReqRanges(req, 11)
|
|
require.Len(t, res, 10)
|
|
for i := range 10 {
|
|
require.Equal(t, res[i].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
|
|
require.Equal(t, res[i].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
|
|
}
|
|
|
|
// case #3.2: 10 subranges and split into 9 parts
|
|
res = backup.SplitBackupReqRanges(req, 9)
|
|
require.Len(t, res, 9)
|
|
for i := range 10 {
|
|
if i == 0 {
|
|
require.Equal(t, res[0].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
|
|
require.Equal(t, res[0].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
|
|
} else if i == 1 {
|
|
require.Equal(t, res[0].SubRanges[1].StartKey, req.SubRanges[i].StartKey)
|
|
require.Equal(t, res[0].SubRanges[1].EndKey, req.SubRanges[i].EndKey)
|
|
} else {
|
|
require.Equal(t, res[i-1].SubRanges[0].StartKey, req.SubRanges[i].StartKey)
|
|
require.Equal(t, res[i-1].SubRanges[0].EndKey, req.SubRanges[i].EndKey)
|
|
}
|
|
}
|
|
|
|
// case #4: 10 subranges and split into 3 parts, the first part has 4 subranges
|
|
// and other part has 3 subranges
|
|
res = backup.SplitBackupReqRanges(req, 3)
|
|
require.Len(t, res, 3)
|
|
for i := range 3 {
|
|
if i == 0 {
|
|
require.Len(t, res[0].SubRanges, 4)
|
|
for j := range 4 {
|
|
require.Equal(t, res[0].SubRanges[j].StartKey, req.SubRanges[j].StartKey)
|
|
require.Equal(t, res[0].SubRanges[j].EndKey, req.SubRanges[j].EndKey)
|
|
}
|
|
} else {
|
|
require.Len(t, res[i].SubRanges, 3)
|
|
for j := range 3 {
|
|
require.Equal(t, res[i].SubRanges[j].StartKey, req.SubRanges[i*3+j+1].StartKey)
|
|
require.Equal(t, res[i].SubRanges[j].EndKey, req.SubRanges[i*3+j+1].EndKey)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestSplitBackupReqRanges2(t *testing.T) {
|
|
cases := []struct {
|
|
totalLen int
|
|
splitN int
|
|
lens []int
|
|
}{
|
|
{
|
|
totalLen: 8,
|
|
splitN: 0,
|
|
lens: []int{8},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 1,
|
|
lens: []int{8},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 2,
|
|
lens: []int{4, 4},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 3,
|
|
lens: []int{3, 3, 2},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 4,
|
|
lens: []int{2, 2, 2, 2},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 5,
|
|
lens: []int{2, 2, 2, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 6,
|
|
lens: []int{2, 2, 1, 1, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 7,
|
|
lens: []int{2, 1, 1, 1, 1, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 8,
|
|
lens: []int{1, 1, 1, 1, 1, 1, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 9,
|
|
lens: []int{1, 1, 1, 1, 1, 1, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 8,
|
|
splitN: 1024,
|
|
lens: []int{1, 1, 1, 1, 1, 1, 1, 1},
|
|
},
|
|
{
|
|
totalLen: 73,
|
|
splitN: 13,
|
|
lens: []int{6, 6, 6, 6, 6, 6, 6, 6, 5, 5, 5, 5, 5},
|
|
},
|
|
}
|
|
|
|
for _, cs := range cases {
|
|
req := backuppb.BackupRequest{
|
|
SubRanges: []*kvrpcpb.KeyRange{},
|
|
}
|
|
genSubRanges(&req, cs.totalLen)
|
|
res := backup.SplitBackupReqRanges(req, cs.splitN)
|
|
rangesIndex := 0
|
|
for i, l := range cs.lens {
|
|
require.Len(t, res[i].SubRanges, l)
|
|
for _, subRanges := range res[i].SubRanges {
|
|
require.Equal(t, req.SubRanges[rangesIndex].StartKey, subRanges.StartKey)
|
|
require.Equal(t, req.SubRanges[rangesIndex].EndKey, subRanges.EndKey)
|
|
rangesIndex += 1
|
|
}
|
|
}
|
|
}
|
|
}
|