925 lines
22 KiB
Go
925 lines
22 KiB
Go
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
|
|
|
|
package streamhelper_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
backup "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/kvproto/pkg/errorpb"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
|
|
"github.com/pingcap/kvproto/pkg/metapb"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/br/pkg/streamhelper"
|
|
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
|
|
"github.com/pingcap/tidb/br/pkg/utils"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
"github.com/tikv/client-go/v2/txnkv/txnlock"
|
|
pd "github.com/tikv/pd/client"
|
|
"github.com/tikv/pd/client/clients/router"
|
|
"github.com/tikv/pd/client/opt"
|
|
"github.com/tikv/pd/client/pkg/caller"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
type flushSimulator struct {
|
|
flushedEpoch atomic.Uint64
|
|
enabled bool
|
|
}
|
|
|
|
func (c *flushSimulator) makeError(requestedEpoch uint64) *errorpb.Error {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
if c.flushedEpoch.Load() == 0 {
|
|
e := errorpb.Error{
|
|
Message: "not flushed",
|
|
}
|
|
return &e
|
|
}
|
|
if c.flushedEpoch.Load() != requestedEpoch {
|
|
e := errorpb.Error{
|
|
Message: "flushed epoch not match",
|
|
}
|
|
return &e
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *flushSimulator) fork() flushSimulator {
|
|
return flushSimulator{
|
|
enabled: c.enabled,
|
|
}
|
|
}
|
|
|
|
type region struct {
|
|
rng spans.Span
|
|
leader uint64
|
|
epoch uint64
|
|
id uint64
|
|
checkpoint atomic.Uint64
|
|
|
|
fsim flushSimulator
|
|
|
|
locks []*txnlock.Lock
|
|
}
|
|
|
|
type fakeStore struct {
|
|
id uint64
|
|
regions map[uint64]*region
|
|
|
|
clientMu sync.Mutex
|
|
supportsSub bool
|
|
bootstrapAt uint64
|
|
fsub func(logbackup.SubscribeFlushEventResponse)
|
|
onGetRegionCheckpoint func(*logbackup.GetLastFlushTSOfRegionRequest) error
|
|
}
|
|
|
|
type fakeCluster struct {
|
|
mu sync.Mutex
|
|
idAlloced uint64
|
|
stores map[uint64]*fakeStore
|
|
regions []*region
|
|
maxTs uint64
|
|
testCtx *testing.T
|
|
|
|
onGetClient func(uint64) error
|
|
onClearCache func(uint64) error
|
|
serviceGCSafePoint uint64
|
|
serviceGCSafePointDeleted bool
|
|
currentTS uint64
|
|
}
|
|
|
|
func (r *region) splitAt(newID uint64, k string) *region {
|
|
newRegion := ®ion{
|
|
rng: kv.KeyRange{StartKey: []byte(k), EndKey: r.rng.EndKey},
|
|
leader: r.leader,
|
|
epoch: r.epoch + 1,
|
|
id: newID,
|
|
fsim: r.fsim.fork(),
|
|
}
|
|
newRegion.checkpoint.Store(r.checkpoint.Load())
|
|
r.rng.EndKey = []byte(k)
|
|
r.epoch += 1
|
|
r.fsim = r.fsim.fork()
|
|
return newRegion
|
|
}
|
|
|
|
func (r *region) flush() {
|
|
r.fsim.flushedEpoch.Store(r.epoch)
|
|
}
|
|
|
|
type trivialFlushStream struct {
|
|
c <-chan logbackup.SubscribeFlushEventResponse
|
|
cx context.Context
|
|
}
|
|
|
|
func (t trivialFlushStream) Recv() (*logbackup.SubscribeFlushEventResponse, error) {
|
|
select {
|
|
case item, ok := <-t.c:
|
|
if !ok {
|
|
return nil, io.EOF
|
|
}
|
|
return &item, nil
|
|
case <-t.cx.Done():
|
|
select {
|
|
case item, ok := <-t.c:
|
|
if !ok {
|
|
return nil, io.EOF
|
|
}
|
|
return &item, nil
|
|
default:
|
|
}
|
|
return nil, status.Error(codes.Canceled, t.cx.Err().Error())
|
|
}
|
|
}
|
|
|
|
func (t trivialFlushStream) Header() (metadata.MD, error) {
|
|
return make(metadata.MD), nil
|
|
}
|
|
|
|
func (t trivialFlushStream) Trailer() metadata.MD {
|
|
return make(metadata.MD)
|
|
}
|
|
|
|
func (t trivialFlushStream) CloseSend() error {
|
|
return nil
|
|
}
|
|
|
|
func (t trivialFlushStream) Context() context.Context {
|
|
return t.cx
|
|
}
|
|
|
|
func (t trivialFlushStream) SendMsg(m any) error {
|
|
return nil
|
|
}
|
|
|
|
func (t trivialFlushStream) RecvMsg(m any) error {
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) {
|
|
f.flush()
|
|
return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil
|
|
}
|
|
|
|
func (f *fakeStore) GetID() uint64 {
|
|
return f.id
|
|
}
|
|
|
|
func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
|
|
f.clientMu.Lock()
|
|
defer f.clientMu.Unlock()
|
|
if !f.supportsSub {
|
|
return nil, status.Error(codes.Unimplemented, "meow?")
|
|
}
|
|
|
|
ch := make(chan logbackup.SubscribeFlushEventResponse, 1024)
|
|
f.fsub = func(glftrr logbackup.SubscribeFlushEventResponse) {
|
|
ch <- glftrr
|
|
}
|
|
return trivialFlushStream{c: ch, cx: ctx}, nil
|
|
}
|
|
|
|
func (f *fakeStore) SetSupportFlushSub(b bool) {
|
|
f.clientMu.Lock()
|
|
defer f.clientMu.Unlock()
|
|
|
|
f.bootstrapAt += 1
|
|
f.supportsSub = b
|
|
}
|
|
|
|
func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) {
|
|
if f.onGetRegionCheckpoint != nil {
|
|
err := f.onGetRegionCheckpoint(in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
resp := &logbackup.GetLastFlushTSOfRegionResponse{
|
|
Checkpoints: []*logbackup.RegionCheckpoint{},
|
|
}
|
|
for _, r := range in.Regions {
|
|
region, ok := f.regions[r.Id]
|
|
if !ok || region.leader != f.id {
|
|
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
|
|
Err: &errorpb.Error{
|
|
Message: "not found",
|
|
},
|
|
Region: &logbackup.RegionIdentity{
|
|
Id: region.id,
|
|
EpochVersion: region.epoch,
|
|
},
|
|
})
|
|
continue
|
|
}
|
|
if err := region.fsim.makeError(r.EpochVersion); err != nil {
|
|
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
|
|
Err: err,
|
|
Region: &logbackup.RegionIdentity{
|
|
Id: region.id,
|
|
EpochVersion: region.epoch,
|
|
},
|
|
})
|
|
continue
|
|
}
|
|
if region.epoch != r.EpochVersion {
|
|
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
|
|
Err: &errorpb.Error{
|
|
Message: "epoch not match",
|
|
},
|
|
Region: &logbackup.RegionIdentity{
|
|
Id: region.id,
|
|
EpochVersion: region.epoch,
|
|
},
|
|
})
|
|
continue
|
|
}
|
|
resp.Checkpoints = append(resp.Checkpoints, &logbackup.RegionCheckpoint{
|
|
Checkpoint: region.checkpoint.Load(),
|
|
Region: &logbackup.RegionIdentity{
|
|
Id: region.id,
|
|
EpochVersion: region.epoch,
|
|
},
|
|
})
|
|
}
|
|
log.Debug("Get last flush ts of region", zap.Stringer("in", in), zap.Stringer("out", resp))
|
|
return resp, nil
|
|
}
|
|
|
|
// Updates the service GC safe point for the cluster.
|
|
// Returns the latest service GC safe point.
|
|
// If the arguments is `0`, this would remove the service safe point.
|
|
func (f *fakeCluster) BlockGCUntil(ctx context.Context, at uint64) (uint64, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
if f.serviceGCSafePoint > at {
|
|
return f.serviceGCSafePoint, errors.Errorf("minimal safe point %d is greater than the target %d", f.serviceGCSafePoint, at)
|
|
}
|
|
f.serviceGCSafePoint = at
|
|
return at, nil
|
|
}
|
|
|
|
func (f *fakeCluster) UnblockGC(ctx context.Context) error {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
f.serviceGCSafePointDeleted = true
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeCluster) FetchCurrentTS(ctx context.Context) (uint64, error) {
|
|
return f.currentTS, nil
|
|
}
|
|
|
|
// RegionScan gets a list of regions, starts from the region that contains key.
|
|
// Limit limits the maximum number of regions returned.
|
|
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
sort.Slice(f.regions, func(i, j int) bool {
|
|
return bytes.Compare(f.regions[i].rng.StartKey, f.regions[j].rng.StartKey) < 0
|
|
})
|
|
|
|
result := make([]streamhelper.RegionWithLeader, 0, limit)
|
|
for _, region := range f.regions {
|
|
if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit {
|
|
regionInfo := streamhelper.RegionWithLeader{
|
|
Region: &metapb.Region{
|
|
Id: region.id,
|
|
StartKey: region.rng.StartKey,
|
|
EndKey: region.rng.EndKey,
|
|
RegionEpoch: &metapb.RegionEpoch{
|
|
Version: region.epoch,
|
|
},
|
|
},
|
|
Leader: &metapb.Peer{
|
|
StoreId: region.leader,
|
|
},
|
|
}
|
|
result = append(result, regionInfo)
|
|
} else if bytes.Compare(region.rng.StartKey, key) > 0 {
|
|
break
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (f *fakeCluster) GetLogBackupClient(ctx context.Context, storeID uint64) (logbackup.LogBackupClient, error) {
|
|
if f.onGetClient != nil {
|
|
err := f.onGetClient(storeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
cli, ok := f.stores[storeID]
|
|
if !ok {
|
|
f.testCtx.Fatalf("the store %d doesn't exist", storeID)
|
|
}
|
|
return cli, nil
|
|
}
|
|
|
|
func (f *fakeCluster) ClearCache(ctx context.Context, storeID uint64) error {
|
|
if f.onClearCache != nil {
|
|
err := f.onClearCache(storeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stores returns the store metadata from the cluster.
|
|
func (f *fakeCluster) Stores(ctx context.Context) ([]streamhelper.Store, error) {
|
|
r := make([]streamhelper.Store, 0, len(f.stores))
|
|
for id, s := range f.stores {
|
|
r = append(r, streamhelper.Store{ID: id, BootAt: s.bootstrapAt})
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
func (f *fakeCluster) findRegionById(rid uint64) *region {
|
|
for _, r := range f.regions {
|
|
if r.id == rid {
|
|
return r
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *fakeCluster) LockRegion(r *region, locks []*txnlock.Lock) *region {
|
|
r.locks = locks
|
|
return r
|
|
}
|
|
|
|
func (f *fakeCluster) findRegionByKey(key []byte) *region {
|
|
for _, r := range f.regions {
|
|
if bytes.Compare(key, r.rng.StartKey) >= 0 && (len(r.rng.EndKey) == 0 || bytes.Compare(key, r.rng.EndKey) < 0) {
|
|
return r
|
|
}
|
|
}
|
|
panic(fmt.Sprintf("inconsistent key space; key = %X", key))
|
|
}
|
|
|
|
func (f *fakeCluster) transferRegionTo(rid uint64, newPeers []uint64) {
|
|
r := f.findRegionById(rid)
|
|
storeLoop:
|
|
for _, store := range f.stores {
|
|
for _, pid := range newPeers {
|
|
if pid == store.id {
|
|
store.regions[rid] = r
|
|
continue storeLoop
|
|
}
|
|
}
|
|
delete(store.regions, rid)
|
|
}
|
|
}
|
|
|
|
func (f *fakeCluster) splitAt(key string) {
|
|
k := []byte(key)
|
|
r := f.findRegionByKey(k)
|
|
newRegion := r.splitAt(f.idAlloc(), key)
|
|
for _, store := range f.stores {
|
|
_, ok := store.regions[r.id]
|
|
if ok {
|
|
store.regions[newRegion.id] = newRegion
|
|
}
|
|
}
|
|
f.regions = append(f.regions, newRegion)
|
|
}
|
|
|
|
func (f *fakeCluster) idAlloc() uint64 {
|
|
f.idAlloced++
|
|
return f.idAlloced
|
|
}
|
|
|
|
func (f *fakeCluster) chooseStores(n int) []uint64 {
|
|
s := make([]uint64, 0, len(f.stores))
|
|
for id := range f.stores {
|
|
s = append(s, id)
|
|
}
|
|
rand.Shuffle(len(s), func(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
})
|
|
return s[:n]
|
|
}
|
|
|
|
func (f *fakeCluster) findPeers(rid uint64) (result []uint64) {
|
|
for _, store := range f.stores {
|
|
if _, ok := store.regions[rid]; ok {
|
|
result = append(result, store.id)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (f *fakeCluster) shuffleLeader(rid uint64) {
|
|
r := f.findRegionById(rid)
|
|
peers := f.findPeers(rid)
|
|
rand.Shuffle(len(peers), func(i, j int) {
|
|
peers[i], peers[j] = peers[j], peers[i]
|
|
})
|
|
|
|
newLeader := peers[0]
|
|
r.leader = newLeader
|
|
}
|
|
|
|
func (f *fakeCluster) splitAndScatter(keys ...string) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
for _, key := range keys {
|
|
f.splitAt(key)
|
|
}
|
|
for _, r := range f.regions {
|
|
chosen := f.chooseStores(3)
|
|
f.transferRegionTo(r.id, chosen)
|
|
f.shuffleLeader(r.id)
|
|
}
|
|
}
|
|
|
|
// Remove a store.
|
|
// Note: this won't add new peer for regions from the store.
|
|
func (f *fakeCluster) removeStore(id uint64) {
|
|
f.mu.Lock()
|
|
defer f.mu.Unlock()
|
|
|
|
s := f.stores[id]
|
|
for _, r := range s.regions {
|
|
if r.leader == id {
|
|
f.updateRegion(r.id, func(r *region) {
|
|
ps := f.findPeers(r.id)
|
|
for _, p := range ps {
|
|
if p != r.leader {
|
|
log.Info("remove store: transforming leader",
|
|
zap.Uint64("region", r.id),
|
|
zap.Uint64("new-leader", p),
|
|
zap.Uint64("old-leader", r.leader))
|
|
r.leader = p
|
|
break
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
delete(f.stores, id)
|
|
}
|
|
|
|
// a stub once in the future we want to make different stores hold different region instances.
|
|
func (f *fakeCluster) updateRegion(rid uint64, mut func(*region)) {
|
|
r := f.findRegionById(rid)
|
|
mut(r)
|
|
}
|
|
|
|
func (f *fakeCluster) advanceCheckpoints() uint64 {
|
|
minCheckpoint := uint64(math.MaxUint64)
|
|
for _, r := range f.regions {
|
|
f.updateRegion(r.id, func(r *region) {
|
|
// The current implementation assumes that the server never returns checkpoint with value 0.
|
|
// This assumption is true for the TiKV implementation, simulating it here.
|
|
cp := r.checkpoint.Add(rand.Uint64()%256 + 1)
|
|
if cp < minCheckpoint {
|
|
minCheckpoint = cp
|
|
}
|
|
r.fsim.flushedEpoch.Store(0)
|
|
})
|
|
}
|
|
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
|
|
return minCheckpoint
|
|
}
|
|
|
|
func (f *fakeCluster) advanceCheckpointBy(duration time.Duration) uint64 {
|
|
minCheckpoint := uint64(math.MaxUint64)
|
|
for _, r := range f.regions {
|
|
f.updateRegion(r.id, func(r *region) {
|
|
newCheckpointTime := oracle.GetTimeFromTS(r.checkpoint.Load()).Add(duration)
|
|
newCheckpoint := oracle.GoTimeToTS(newCheckpointTime)
|
|
r.checkpoint.Store(newCheckpoint)
|
|
if newCheckpoint < minCheckpoint {
|
|
minCheckpoint = newCheckpoint
|
|
}
|
|
r.fsim.flushedEpoch.Store(0)
|
|
})
|
|
}
|
|
log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint))
|
|
return minCheckpoint
|
|
}
|
|
|
|
func (f *fakeCluster) advanceClusterTimeBy(duration time.Duration) uint64 {
|
|
newTime := oracle.GoTimeToTS(oracle.GetTimeFromTS(f.currentTS).Add(duration))
|
|
f.currentTS = newTime
|
|
return newTime
|
|
}
|
|
|
|
func createFakeCluster(t *testing.T, n int, simEnabled bool) *fakeCluster {
|
|
c := &fakeCluster{
|
|
stores: map[uint64]*fakeStore{},
|
|
regions: []*region{},
|
|
testCtx: t,
|
|
serviceGCSafePoint: 0,
|
|
}
|
|
stores := make([]*fakeStore, 0, n)
|
|
for range n {
|
|
s := new(fakeStore)
|
|
s.id = c.idAlloc()
|
|
s.regions = map[uint64]*region{}
|
|
stores = append(stores, s)
|
|
}
|
|
initialRegion := ®ion{
|
|
rng: kv.KeyRange{},
|
|
leader: stores[0].id,
|
|
epoch: 0,
|
|
id: c.idAlloc(),
|
|
fsim: flushSimulator{
|
|
enabled: simEnabled,
|
|
},
|
|
}
|
|
for i := range 3 {
|
|
if i < len(stores) {
|
|
stores[i].regions[initialRegion.id] = initialRegion
|
|
}
|
|
}
|
|
for _, s := range stores {
|
|
c.stores[s.id] = s
|
|
}
|
|
c.regions = append(c.regions, initialRegion)
|
|
return c
|
|
}
|
|
|
|
func (r *region) String() string {
|
|
return fmt.Sprintf("%d(%d):[%s, %s);%dL%dF%d",
|
|
r.id,
|
|
r.epoch,
|
|
hex.EncodeToString(r.rng.StartKey),
|
|
hex.EncodeToString(r.rng.EndKey),
|
|
r.checkpoint.Load(),
|
|
r.leader,
|
|
r.fsim.flushedEpoch.Load())
|
|
}
|
|
|
|
func (f *fakeStore) String() string {
|
|
buf := new(strings.Builder)
|
|
fmt.Fprintf(buf, "%d: ", f.id)
|
|
for _, r := range f.regions {
|
|
fmt.Fprintf(buf, "%s ", r)
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
func (f *fakeCluster) flushAll() {
|
|
for _, r := range f.stores {
|
|
r.flush()
|
|
}
|
|
}
|
|
|
|
func (f *fakeCluster) flushAllExcept(keys ...string) {
|
|
for _, s := range f.stores {
|
|
s.flushExcept(keys...)
|
|
}
|
|
}
|
|
|
|
func (f *fakeStore) flushExcept(keys ...string) {
|
|
resp := make([]*logbackup.FlushEvent, 0, len(f.regions))
|
|
outer:
|
|
for _, r := range f.regions {
|
|
if r.leader != f.id {
|
|
continue
|
|
}
|
|
// Note: can we make it faster?
|
|
for _, key := range keys {
|
|
if utils.CompareBytesExt(r.rng.StartKey, false, []byte(key), false) <= 0 &&
|
|
utils.CompareBytesExt([]byte(key), false, r.rng.EndKey, true) < 0 {
|
|
continue outer
|
|
}
|
|
}
|
|
if r.leader == f.id {
|
|
r.flush()
|
|
resp = append(resp, &logbackup.FlushEvent{
|
|
StartKey: codec.EncodeBytes(nil, r.rng.StartKey),
|
|
EndKey: codec.EncodeBytes(nil, r.rng.EndKey),
|
|
Checkpoint: r.checkpoint.Load(),
|
|
})
|
|
}
|
|
}
|
|
|
|
if f.fsub != nil {
|
|
f.fsub(logbackup.SubscribeFlushEventResponse{
|
|
Events: resp,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (f *fakeStore) flush() {
|
|
f.flushExcept()
|
|
}
|
|
|
|
func (f *fakeCluster) String() string {
|
|
buf := new(strings.Builder)
|
|
fmt.Fprint(buf, ">>> fake cluster <<<\nregions: ")
|
|
for _, region := range f.regions {
|
|
fmt.Fprint(buf, region, " ")
|
|
}
|
|
fmt.Fprintln(buf)
|
|
for _, store := range f.stores {
|
|
fmt.Fprintln(buf, store)
|
|
}
|
|
return buf.String()
|
|
}
|
|
|
|
type testEnv struct {
|
|
*fakeCluster
|
|
checkpoint uint64
|
|
pdDisconnected atomic.Bool
|
|
testCtx *testing.T
|
|
ranges []kv.KeyRange
|
|
taskCh chan<- streamhelper.TaskEvent
|
|
task streamhelper.TaskEvent
|
|
|
|
resolveLocks func([]*txnlock.Lock, *tikv.KeyLocation) (*tikv.KeyLocation, error)
|
|
|
|
mu sync.Mutex
|
|
pd.Client
|
|
}
|
|
|
|
func newTestEnv(c *fakeCluster, t *testing.T) *testEnv {
|
|
env := &testEnv{
|
|
fakeCluster: c,
|
|
testCtx: t,
|
|
}
|
|
rngs := env.ranges
|
|
if len(rngs) == 0 {
|
|
rngs = []kv.KeyRange{{}}
|
|
}
|
|
env.task = streamhelper.TaskEvent{
|
|
Type: streamhelper.EventAdd,
|
|
Name: "whole",
|
|
Info: &backup.StreamBackupTaskInfo{
|
|
Name: "whole",
|
|
StartTs: 0,
|
|
},
|
|
Ranges: rngs,
|
|
}
|
|
return env
|
|
}
|
|
|
|
func (t *testEnv) Begin(ctx context.Context, ch chan<- streamhelper.TaskEvent) error {
|
|
ch <- t.task
|
|
t.taskCh = ch
|
|
return nil
|
|
}
|
|
|
|
func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, checkpoint uint64) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if checkpoint < t.checkpoint {
|
|
log.Error("checkpoint rolling back",
|
|
zap.Uint64("from", t.checkpoint),
|
|
zap.Uint64("to", checkpoint),
|
|
zap.Stack("stack"))
|
|
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
|
|
return errors.New("checkpoint rolling back")
|
|
}
|
|
t.checkpoint = checkpoint
|
|
return nil
|
|
}
|
|
|
|
func (t *testEnv) mockPDConnectionError() {
|
|
t.pdDisconnected.Store(true)
|
|
}
|
|
|
|
func (t *testEnv) connectPD() bool {
|
|
if !t.pdDisconnected.Load() {
|
|
return true
|
|
}
|
|
t.pdDisconnected.Store(false)
|
|
return false
|
|
}
|
|
|
|
func (t *testEnv) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
|
|
if !t.connectPD() {
|
|
return 0, status.Error(codes.Unavailable, "pd disconnected")
|
|
}
|
|
return t.checkpoint, nil
|
|
}
|
|
|
|
func (t *testEnv) ClearV3GlobalCheckpointForTask(ctx context.Context, taskName string) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
t.checkpoint = 0
|
|
return nil
|
|
}
|
|
|
|
func (t *testEnv) PauseTask(ctx context.Context, taskName string, _ ...streamhelper.PauseTaskOption) error {
|
|
t.taskCh <- streamhelper.TaskEvent{
|
|
Type: streamhelper.EventPause,
|
|
Name: taskName,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *testEnv) ResumeTask(ctx context.Context) error {
|
|
t.taskCh <- streamhelper.TaskEvent{
|
|
Type: streamhelper.EventResume,
|
|
Name: "whole",
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *testEnv) getCheckpoint() uint64 {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
return t.checkpoint
|
|
}
|
|
|
|
func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))
|
|
|
|
t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
|
|
}
|
|
|
|
func (t *testEnv) unregisterTask() {
|
|
t.taskCh <- streamhelper.TaskEvent{
|
|
Type: streamhelper.EventDel,
|
|
Name: "whole",
|
|
}
|
|
}
|
|
|
|
func (t *testEnv) putTask() {
|
|
rngs := t.ranges
|
|
if len(rngs) == 0 {
|
|
rngs = []kv.KeyRange{{}}
|
|
}
|
|
tsk := streamhelper.TaskEvent{
|
|
Type: streamhelper.EventAdd,
|
|
Name: "whole",
|
|
Info: &backup.StreamBackupTaskInfo{
|
|
Name: "whole",
|
|
StartTs: 0,
|
|
},
|
|
Ranges: rngs,
|
|
}
|
|
t.taskCh <- tsk
|
|
}
|
|
|
|
func (t *testEnv) ScanLocksInOneRegion(bo *tikv.Backoffer, key []byte, endKey []byte, maxVersion uint64, limit uint32) ([]*txnlock.Lock, *tikv.KeyLocation, error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
if t.maxTs != maxVersion {
|
|
return nil, nil, errors.Errorf("unexpect max version in scan lock, expected %d, actual %d", t.maxTs, maxVersion)
|
|
}
|
|
for _, r := range t.regions {
|
|
if len(r.locks) != 0 {
|
|
locks := make([]*txnlock.Lock, 0, len(r.locks))
|
|
for _, l := range r.locks {
|
|
// skip the lock larger than maxVersion
|
|
if l.TxnID < maxVersion {
|
|
locks = append(locks, l)
|
|
}
|
|
}
|
|
return locks, &tikv.KeyLocation{
|
|
Region: tikv.NewRegionVerID(r.id, 0, 0),
|
|
}, nil
|
|
}
|
|
}
|
|
return nil, &tikv.KeyLocation{}, nil
|
|
}
|
|
|
|
func (t *testEnv) ResolveLocksInOneRegion(bo *tikv.Backoffer, locks []*txnlock.Lock, loc *tikv.KeyLocation) (*tikv.KeyLocation, error) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
for _, r := range t.regions {
|
|
if loc != nil && loc.Region.GetID() == r.id {
|
|
// reset locks
|
|
r.locks = nil
|
|
return t.resolveLocks(locks, loc)
|
|
}
|
|
}
|
|
return loc, nil
|
|
}
|
|
|
|
func (t *testEnv) Identifier() string {
|
|
return "advance test"
|
|
}
|
|
|
|
func (t *testEnv) GetStore() tikv.Storage {
|
|
// only used for GetRegionCache once in resolve lock
|
|
return &mockTiKVStore{regionCache: tikv.NewRegionCache(&mockPDClient{fakeRegions: t.regions})}
|
|
}
|
|
|
|
type mockKVStore struct {
|
|
kv.Storage
|
|
}
|
|
|
|
type mockTiKVStore struct {
|
|
mockKVStore
|
|
tikv.Storage
|
|
regionCache *tikv.RegionCache
|
|
}
|
|
|
|
func (s *mockTiKVStore) GetRegionCache() *tikv.RegionCache {
|
|
return s.regionCache
|
|
}
|
|
|
|
func (s *mockTiKVStore) SendReq(bo *tikv.Backoffer, req *tikvrpc.Request, regionID tikv.RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
|
|
scanResp := kvrpcpb.ScanLockResponse{
|
|
// we don't need mock locks here, because we already have mock locks in testEnv.Scanlocks.
|
|
// this behaviour is align with gc_worker_test
|
|
Locks: nil,
|
|
RegionError: nil,
|
|
}
|
|
return &tikvrpc.Response{Resp: &scanResp}, nil
|
|
}
|
|
|
|
type mockPDClient struct {
|
|
pd.Client
|
|
fakeRegions []*region
|
|
}
|
|
|
|
func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...opt.GetRegionOption) ([]*router.Region, error) {
|
|
sort.Slice(p.fakeRegions, func(i, j int) bool {
|
|
return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0
|
|
})
|
|
|
|
result := make([]*router.Region, 0, len(p.fakeRegions))
|
|
for _, region := range p.fakeRegions {
|
|
if spans.Overlaps(kv.KeyRange{StartKey: key, EndKey: endKey}, region.rng) && len(result) < limit {
|
|
regionInfo := newMockRegion(region.id, region.rng.StartKey, region.rng.EndKey)
|
|
result = append(result, regionInfo)
|
|
} else if bytes.Compare(region.rng.StartKey, key) > 0 {
|
|
break
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (p *mockPDClient) GetStore(_ context.Context, storeID uint64) (*metapb.Store, error) {
|
|
return &metapb.Store{
|
|
Id: storeID,
|
|
Address: fmt.Sprintf("127.0.0.%d", storeID),
|
|
}, nil
|
|
}
|
|
|
|
func (p *mockPDClient) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) {
|
|
// only used for GetRegionCache once in resolve lock
|
|
return []*metapb.Store{
|
|
{
|
|
Id: 1,
|
|
Address: "127.0.0.1",
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (p *mockPDClient) GetClusterID(ctx context.Context) uint64 {
|
|
return 1
|
|
}
|
|
|
|
func (p *mockPDClient) WithCallerComponent(_ caller.Component) pd.Client {
|
|
return p
|
|
}
|
|
|
|
func newMockRegion(regionID uint64, startKey []byte, endKey []byte) *router.Region {
|
|
leader := &metapb.Peer{
|
|
Id: regionID,
|
|
StoreId: 1,
|
|
Role: metapb.PeerRole_Voter,
|
|
}
|
|
|
|
return &router.Region{
|
|
Meta: &metapb.Region{
|
|
Id: regionID,
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
Peers: []*metapb.Peer{leader},
|
|
},
|
|
Leader: leader,
|
|
}
|
|
}
|