diff --git a/store/mockstore/cluster/cluster.go b/store/mockstore/cluster/cluster.go index 43d5ecae11..ee7f2fd7ac 100644 --- a/store/mockstore/cluster/cluster.go +++ b/store/mockstore/cluster/cluster.go @@ -42,4 +42,8 @@ type Cluster interface { SplitIndex(tableID, indexID int64, count int) // SplitKeys evenly splits the start, end key into "count" regions. SplitKeys(start, end kv.Key, count int) + // AddStore adds a new Store to the cluster. + AddStore(storeID uint64, addr string) + // RemoveStore removes a Store from the cluster. + RemoveStore(storeID uint64) } diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index c044d3eb95..8fcaf64601 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -925,7 +925,6 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren // First try resolve locks with physical scan err := w.resolveLocksPhysical(ctx, safePoint) - if err == nil { return nil } @@ -1071,27 +1070,31 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e zap.Uint64("safePoint", safePoint)) startTime := time.Now() - stores, err := w.getUpStoresMapForGC(ctx) - if err != nil { - return errors.Trace(err) - } + registeredStores := make(map[uint64]*metapb.Store) + defer w.removeLockObservers(ctx, safePoint, registeredStores) - defer func() { - w.removeLockObservers(ctx, safePoint, stores) - }() - - err = w.registerLockObservers(ctx, safePoint, stores) + dirtyStores, err := w.getUpStoresMapForGC(ctx) if err != nil { return errors.Trace(err) } for retry := 0; retry < 3; retry++ { - resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, stores) + err = w.registerLockObservers(ctx, safePoint, dirtyStores) + if err != nil { + return errors.Trace(err) + } + for id, store := range dirtyStores { + registeredStores[id] = store + } + + resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, dirtyStores) if err != nil { return errors.Trace(err) } - stores, err = w.getUpStoresMapForGC(ctx) + failpoint.Inject("beforeCheckLockObservers", func() {}) + + stores, err := w.getUpStoresMapForGC(ctx) if err != nil { return errors.Trace(err) } @@ -1101,22 +1104,38 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e return errors.Trace(err) } - // Remove clean stores from the set - for resolvedStore := range resolvedStores { - // Only stores that are both resolved and checked is clean. - // For each clean store, remove it from the stores set. - if _, ok := checkedStores[resolvedStore]; ok { - delete(stores, resolvedStore) + for store := range stores { + if _, ok := checkedStores[store]; ok { + // The store is resolved and checked. + if _, ok := resolvedStores[store]; ok { + delete(stores, store) + } + // The store is checked and has been resolved before. + if _, ok := dirtyStores[store]; !ok { + delete(stores, store) + } + // If the store is checked and not resolved, we can retry to resolve it again, so leave it in dirtyStores. + } else if _, ok := registeredStores[store]; ok { + // The store has been registered and it's dirty due to too many collected locks. Fall back to legacy mode. + // We can't remove the lock observer from the store and retry the whole procedure because if the store + // receives duplicated remove and register requests during resolving locks, the store will be cleaned + // when checking but the lock observer drops some locks. It may results in missing locks. + return errors.Errorf("store %v is dirty", store) } } + dirtyStores = stores // If there are still dirty stores, continue the loop to clean them again. // Only dirty stores will be scanned in the next loop. - if len(stores) == 0 { + if len(dirtyStores) == 0 { break } } + if len(dirtyStores) != 0 { + return errors.Errorf("still has %d dirty stores after physical resolve locks", len(dirtyStores)) + } + logutil.Logger(ctx).Info("[gc worker] finish resolve locks with physical scan locks", zap.String("uuid", w.uuid), zap.Uint64("safePoint", safePoint), @@ -1141,7 +1160,9 @@ func (w *GCWorker) registerLockObservers(ctx context.Context, safePoint uint64, if err != nil { return errors.Trace(err) } - + if resp.Resp == nil { + return errors.Trace(tikv.ErrBodyMissing) + } errStr := resp.Resp.(*kvrpcpb.RegisterLockObserverResponse).Error if len(errStr) > 0 { return errors.Errorf("register lock observer on store %v returns error: %v", store.Id, errStr) @@ -1161,31 +1182,41 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto req := tikvrpc.NewRequest(tikvrpc.CmdCheckLockObserver, &kvrpcpb.CheckLockObserverRequest{ MaxTs: safePoint, }) - cleanStores := make(map[uint64]interface{}, len(stores)) + logError := func(store *metapb.Store, err error) { + logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store", + zap.String("uuid", w.uuid), + zap.Any("store", store), + zap.Error(err)) + } + // When error occurs, this function doesn't fail immediately, but continues without adding the failed store to // cleanStores set. - for _, store := range stores { address := store.Address resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout) if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store", - zap.String("uuid", w.uuid), - zap.Any("store", store), - zap.Error(err)) + logError(store, err) + continue + } + if resp.Resp == nil { + logError(store, tikv.ErrBodyMissing) continue } - respInner := resp.Resp.(*kvrpcpb.CheckLockObserverResponse) if len(respInner.Error) > 0 { err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error) - logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store", + logError(store, err) + continue + } + + // No need to resolve observed locks on uncleaned stores. + if !respInner.IsClean { + logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean", zap.String("uuid", w.uuid), - zap.Any("store", store), - zap.Error(err)) + zap.Any("store", store)) continue } @@ -1202,21 +1233,11 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto if err != nil { err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error) - logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store", - zap.String("uuid", w.uuid), - zap.Any("store", store), - zap.Error(err)) + logError(store, err) continue } } - - if respInner.IsClean { - cleanStores[store.Id] = nil - } else { - logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean", - zap.String("uuid", w.uuid), - zap.Any("store", store)) - } + cleanStores[store.Id] = nil } return cleanStores, nil @@ -1231,25 +1252,29 @@ func (w *GCWorker) removeLockObservers(ctx context.Context, safePoint uint64, st MaxTs: safePoint, }) + logError := func(store *metapb.Store, err error) { + logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store", + zap.String("uuid", w.uuid), + zap.Any("store", store), + zap.Error(err)) + } + for _, store := range stores { address := store.Address resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout) if err != nil { - logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store", - zap.String("uuid", w.uuid), - zap.Any("store", store), - zap.Error(err)) + logError(store, err) + continue + } + if resp.Resp == nil { + logError(store, tikv.ErrBodyMissing) continue } - errStr := resp.Resp.(*kvrpcpb.RemoveLockObserverResponse).Error if len(errStr) > 0 { err = errors.Errorf("remove lock observer on store %v returns error: %v", store.Id, errStr) - logutil.Logger(ctx).Error("[gc worker] failed to remove lock observer from store", - zap.String("uuid", w.uuid), - zap.Any("store", store), - zap.Error(err)) + logError(store, err) } } } @@ -1982,12 +2007,10 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo if err != nil { return errors.Trace(err) } - - resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse) - if resp == nil { - return errors.Errorf("physical scan lock response is nil") + if response.Resp == nil { + return errors.Trace(tikv.ErrBodyMissing) } - + resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse) if len(resp.Error) > 0 { return errors.Errorf("physical scan lock received error from store: %v", resp.Error) } diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 64c94cae2e..009fda92c6 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -20,6 +20,8 @@ import ( "math" "sort" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -696,8 +698,11 @@ Loop: type testGCWorkerClient struct { tikv.Client - unsafeDestroyRangeHandler handler - physicalScanLockHandler handler + unsafeDestroyRangeHandler handler + physicalScanLockHandler handler + registerLockObserverHandler handler + checkLockObserverHandler handler + removeLockObserverHandler handler } type handler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) @@ -709,6 +714,15 @@ func (c *testGCWorkerClient) SendRequest(ctx context.Context, addr string, req * if req.Type == tikvrpc.CmdPhysicalScanLock && c.physicalScanLockHandler != nil { return c.physicalScanLockHandler(addr, req) } + if req.Type == tikvrpc.CmdRegisterLockObserver && c.registerLockObserverHandler != nil { + return c.registerLockObserverHandler(addr, req) + } + if req.Type == tikvrpc.CmdCheckLockObserver && c.checkLockObserverHandler != nil { + return c.checkLockObserverHandler(addr, req) + } + if req.Type == tikvrpc.CmdRemoveLockObserver && c.removeLockObserverHandler != nil { + return c.removeLockObserverHandler(addr, req) + } return c.Client.SendRequest(ctx, addr, req, timeout) } @@ -1168,6 +1182,176 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { } } +func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) { + alwaysSucceedHanlder := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + switch req.Type { + case tikvrpc.CmdPhysicalScanLock: + return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: nil, Error: ""}}, nil + case tikvrpc.CmdRegisterLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.RegisterLockObserverResponse{Error: ""}}, nil + case tikvrpc.CmdCheckLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: true, Locks: nil}}, nil + case tikvrpc.CmdRemoveLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.RemoveLockObserverResponse{Error: ""}}, nil + default: + panic("unreachable") + } + } + alwaysFailHandler := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + switch req.Type { + case tikvrpc.CmdPhysicalScanLock: + return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: nil, Error: "error"}}, nil + case tikvrpc.CmdRegisterLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.RegisterLockObserverResponse{Error: "error"}}, nil + case tikvrpc.CmdCheckLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "error", IsClean: false, Locks: nil}}, nil + case tikvrpc.CmdRemoveLockObserver: + return &tikvrpc.Response{Resp: &kvrpcpb.RemoveLockObserverResponse{Error: "error"}}, nil + default: + panic("unreachable") + } + } + reset := func() { + s.client.physicalScanLockHandler = alwaysSucceedHanlder + s.client.registerLockObserverHandler = alwaysSucceedHanlder + s.client.checkLockObserverHandler = alwaysSucceedHanlder + s.client.removeLockObserverHandler = alwaysSucceedHanlder + } + + ctx := context.Background() + + // No lock + reset() + err := s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, IsNil) + + // Should return error when fails to register lock observers. + reset() + s.client.registerLockObserverHandler = alwaysFailHandler + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, ErrorMatches, "register lock observer.*") + + // Should return error when fails to resolve locks. + reset() + s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + locks := []*kvrpcpb.LockInfo{{Key: []byte{0}}} + return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: locks, Error: ""}}, nil + } + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr", "return(100)"), IsNil) + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr"), IsNil) + c.Assert(err, ErrorMatches, "injectedError") + + // Shouldn't return error when fails to scan locks. + reset() + var returnError uint32 = 1 + s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + if atomic.CompareAndSwapUint32(&returnError, 1, 0) { + return alwaysFailHandler(addr, req) + } + return alwaysSucceedHanlder(addr, req) + } + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, IsNil) + + // Should return error if reaches retry limit + reset() + s.client.physicalScanLockHandler = alwaysFailHandler + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, ErrorMatches, ".*dirty.*") + + // Should return error when one registered store is dirty. + reset() + s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil + } + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, ErrorMatches, "store.*dirty") + + // Should return error when fails to check lock observers. + reset() + s.client.checkLockObserverHandler = alwaysFailHandler + err = s.gcWorker.resolveLocksPhysical(ctx, 10000) + // When fails to check lock observer in a store, we assume the store is dirty. + c.Assert(err, ErrorMatches, "store.*dirty") + + // Shouldn't return error when the dirty store is newly added. + reset() + var wg sync.WaitGroup + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil) + go func() { + defer wg.Done() + err := s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, IsNil) + }() + // Sleep to let the goroutine pause. + time.Sleep(500 * time.Millisecond) + s.cluster.AddStore(100, "store100") + once := true + s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + // The newly added store returns IsClean=false for the first time. + if addr == "store100" && once { + once = false + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil + } + return alwaysSucceedHanlder(addr, req) + } + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil) + wg.Wait() + + // Shouldn't return error when a store is removed. + reset() + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil) + go func() { + defer wg.Done() + err := s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, IsNil) + }() + // Sleep to let the goroutine pause. + time.Sleep(500 * time.Millisecond) + s.cluster.RemoveStore(100) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil) + wg.Wait() + + // Should return error when a cleaned store becomes dirty. + reset() + wg.Add(1) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil) + go func() { + defer wg.Done() + err := s.gcWorker.resolveLocksPhysical(ctx, 10000) + c.Assert(err, ErrorMatches, "store.*dirty") + }() + // Sleep to let the goroutine pause. + time.Sleep(500 * time.Millisecond) + store := s.cluster.GetAllStores()[0] + var onceClean uint32 = 1 + s.cluster.AddStore(100, "store100") + var onceDirty uint32 = 1 + s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) { + switch addr { + case "store100": + // The newly added store returns IsClean=false for the first time. + if atomic.CompareAndSwapUint32(&onceDirty, 1, 0) { + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil + } + return alwaysSucceedHanlder(addr, req) + case store.Address: + // The store returns IsClean=true for the first time. + if atomic.CompareAndSwapUint32(&onceClean, 1, 0) { + return alwaysSucceedHanlder(addr, req) + } + return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil + default: + return alwaysSucceedHanlder(addr, req) + } + } + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil) + wg.Wait() +} + func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) { ctx := context.Background() stores := s.cluster.GetAllStores()