gc_worker: fix serveral bugs of green gc (#16413)
This commit is contained in:
@ -42,4 +42,8 @@ type Cluster interface {
|
||||
SplitIndex(tableID, indexID int64, count int)
|
||||
// SplitKeys evenly splits the start, end key into "count" regions.
|
||||
SplitKeys(start, end kv.Key, count int)
|
||||
// AddStore adds a new Store to the cluster.
|
||||
AddStore(storeID uint64, addr string)
|
||||
// RemoveStore removes a Store from the cluster.
|
||||
RemoveStore(storeID uint64)
|
||||
}
|
||||
|
||||
@ -925,7 +925,6 @@ func (w *GCWorker) resolveLocks(ctx context.Context, safePoint uint64, concurren
|
||||
|
||||
// First try resolve locks with physical scan
|
||||
err := w.resolveLocksPhysical(ctx, safePoint)
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
@ -1071,27 +1070,31 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
|
||||
zap.Uint64("safePoint", safePoint))
|
||||
startTime := time.Now()
|
||||
|
||||
stores, err := w.getUpStoresMapForGC(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
registeredStores := make(map[uint64]*metapb.Store)
|
||||
defer w.removeLockObservers(ctx, safePoint, registeredStores)
|
||||
|
||||
defer func() {
|
||||
w.removeLockObservers(ctx, safePoint, stores)
|
||||
}()
|
||||
|
||||
err = w.registerLockObservers(ctx, safePoint, stores)
|
||||
dirtyStores, err := w.getUpStoresMapForGC(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
for retry := 0; retry < 3; retry++ {
|
||||
resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, stores)
|
||||
err = w.registerLockObservers(ctx, safePoint, dirtyStores)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for id, store := range dirtyStores {
|
||||
registeredStores[id] = store
|
||||
}
|
||||
|
||||
resolvedStores, err := w.physicalScanAndResolveLocks(ctx, safePoint, dirtyStores)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
stores, err = w.getUpStoresMapForGC(ctx)
|
||||
failpoint.Inject("beforeCheckLockObservers", func() {})
|
||||
|
||||
stores, err := w.getUpStoresMapForGC(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -1101,22 +1104,38 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// Remove clean stores from the set
|
||||
for resolvedStore := range resolvedStores {
|
||||
// Only stores that are both resolved and checked is clean.
|
||||
// For each clean store, remove it from the stores set.
|
||||
if _, ok := checkedStores[resolvedStore]; ok {
|
||||
delete(stores, resolvedStore)
|
||||
for store := range stores {
|
||||
if _, ok := checkedStores[store]; ok {
|
||||
// The store is resolved and checked.
|
||||
if _, ok := resolvedStores[store]; ok {
|
||||
delete(stores, store)
|
||||
}
|
||||
// The store is checked and has been resolved before.
|
||||
if _, ok := dirtyStores[store]; !ok {
|
||||
delete(stores, store)
|
||||
}
|
||||
// If the store is checked and not resolved, we can retry to resolve it again, so leave it in dirtyStores.
|
||||
} else if _, ok := registeredStores[store]; ok {
|
||||
// The store has been registered and it's dirty due to too many collected locks. Fall back to legacy mode.
|
||||
// We can't remove the lock observer from the store and retry the whole procedure because if the store
|
||||
// receives duplicated remove and register requests during resolving locks, the store will be cleaned
|
||||
// when checking but the lock observer drops some locks. It may results in missing locks.
|
||||
return errors.Errorf("store %v is dirty", store)
|
||||
}
|
||||
}
|
||||
dirtyStores = stores
|
||||
|
||||
// If there are still dirty stores, continue the loop to clean them again.
|
||||
// Only dirty stores will be scanned in the next loop.
|
||||
if len(stores) == 0 {
|
||||
if len(dirtyStores) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(dirtyStores) != 0 {
|
||||
return errors.Errorf("still has %d dirty stores after physical resolve locks", len(dirtyStores))
|
||||
}
|
||||
|
||||
logutil.Logger(ctx).Info("[gc worker] finish resolve locks with physical scan locks",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Uint64("safePoint", safePoint),
|
||||
@ -1141,7 +1160,9 @@ func (w *GCWorker) registerLockObservers(ctx context.Context, safePoint uint64,
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if resp.Resp == nil {
|
||||
return errors.Trace(tikv.ErrBodyMissing)
|
||||
}
|
||||
errStr := resp.Resp.(*kvrpcpb.RegisterLockObserverResponse).Error
|
||||
if len(errStr) > 0 {
|
||||
return errors.Errorf("register lock observer on store %v returns error: %v", store.Id, errStr)
|
||||
@ -1161,31 +1182,41 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto
|
||||
req := tikvrpc.NewRequest(tikvrpc.CmdCheckLockObserver, &kvrpcpb.CheckLockObserverRequest{
|
||||
MaxTs: safePoint,
|
||||
})
|
||||
|
||||
cleanStores := make(map[uint64]interface{}, len(stores))
|
||||
|
||||
logError := func(store *metapb.Store, err error) {
|
||||
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
// When error occurs, this function doesn't fail immediately, but continues without adding the failed store to
|
||||
// cleanStores set.
|
||||
|
||||
for _, store := range stores {
|
||||
address := store.Address
|
||||
|
||||
resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
logError(store, err)
|
||||
continue
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
logError(store, tikv.ErrBodyMissing)
|
||||
continue
|
||||
}
|
||||
|
||||
respInner := resp.Resp.(*kvrpcpb.CheckLockObserverResponse)
|
||||
if len(respInner.Error) > 0 {
|
||||
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
|
||||
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
|
||||
logError(store, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// No need to resolve observed locks on uncleaned stores.
|
||||
if !respInner.IsClean {
|
||||
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
zap.Any("store", store))
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1202,21 +1233,11 @@ func (w *GCWorker) checkLockObservers(ctx context.Context, safePoint uint64, sto
|
||||
|
||||
if err != nil {
|
||||
err = errors.Errorf("check lock observer on store %v returns error: %v", store.Id, respInner.Error)
|
||||
logutil.Logger(ctx).Error("[gc worker] failed to check lock observer for store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
logError(store, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if respInner.IsClean {
|
||||
cleanStores[store.Id] = nil
|
||||
} else {
|
||||
logutil.Logger(ctx).Warn("[gc worker] check lock observer: store is not clean",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store))
|
||||
}
|
||||
cleanStores[store.Id] = nil
|
||||
}
|
||||
|
||||
return cleanStores, nil
|
||||
@ -1231,25 +1252,29 @@ func (w *GCWorker) removeLockObservers(ctx context.Context, safePoint uint64, st
|
||||
MaxTs: safePoint,
|
||||
})
|
||||
|
||||
logError := func(store *metapb.Store, err error) {
|
||||
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
for _, store := range stores {
|
||||
address := store.Address
|
||||
|
||||
resp, err := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.AccessLockObserverTimeout)
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("[gc worker] failed to remove lock observer from store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
logError(store, err)
|
||||
continue
|
||||
}
|
||||
if resp.Resp == nil {
|
||||
logError(store, tikv.ErrBodyMissing)
|
||||
continue
|
||||
}
|
||||
|
||||
errStr := resp.Resp.(*kvrpcpb.RemoveLockObserverResponse).Error
|
||||
if len(errStr) > 0 {
|
||||
err = errors.Errorf("remove lock observer on store %v returns error: %v", store.Id, errStr)
|
||||
logutil.Logger(ctx).Error("[gc worker] failed to remove lock observer from store",
|
||||
zap.String("uuid", w.uuid),
|
||||
zap.Any("store", store),
|
||||
zap.Error(err))
|
||||
logError(store, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1982,12 +2007,10 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
|
||||
if resp == nil {
|
||||
return errors.Errorf("physical scan lock response is nil")
|
||||
if response.Resp == nil {
|
||||
return errors.Trace(tikv.ErrBodyMissing)
|
||||
}
|
||||
|
||||
resp := response.Resp.(*kvrpcpb.PhysicalScanLockResponse)
|
||||
if len(resp.Error) > 0 {
|
||||
return errors.Errorf("physical scan lock received error from store: %v", resp.Error)
|
||||
}
|
||||
|
||||
@ -20,6 +20,8 @@ import (
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -696,8 +698,11 @@ Loop:
|
||||
|
||||
type testGCWorkerClient struct {
|
||||
tikv.Client
|
||||
unsafeDestroyRangeHandler handler
|
||||
physicalScanLockHandler handler
|
||||
unsafeDestroyRangeHandler handler
|
||||
physicalScanLockHandler handler
|
||||
registerLockObserverHandler handler
|
||||
checkLockObserverHandler handler
|
||||
removeLockObserverHandler handler
|
||||
}
|
||||
|
||||
type handler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error)
|
||||
@ -709,6 +714,15 @@ func (c *testGCWorkerClient) SendRequest(ctx context.Context, addr string, req *
|
||||
if req.Type == tikvrpc.CmdPhysicalScanLock && c.physicalScanLockHandler != nil {
|
||||
return c.physicalScanLockHandler(addr, req)
|
||||
}
|
||||
if req.Type == tikvrpc.CmdRegisterLockObserver && c.registerLockObserverHandler != nil {
|
||||
return c.registerLockObserverHandler(addr, req)
|
||||
}
|
||||
if req.Type == tikvrpc.CmdCheckLockObserver && c.checkLockObserverHandler != nil {
|
||||
return c.checkLockObserverHandler(addr, req)
|
||||
}
|
||||
if req.Type == tikvrpc.CmdRemoveLockObserver && c.removeLockObserverHandler != nil {
|
||||
return c.removeLockObserverHandler(addr, req)
|
||||
}
|
||||
|
||||
return c.Client.SendRequest(ctx, addr, req, timeout)
|
||||
}
|
||||
@ -1168,6 +1182,176 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testGCWorkerSuite) TestResolveLocksPhysical(c *C) {
|
||||
alwaysSucceedHanlder := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdPhysicalScanLock:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: nil, Error: ""}}, nil
|
||||
case tikvrpc.CmdRegisterLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.RegisterLockObserverResponse{Error: ""}}, nil
|
||||
case tikvrpc.CmdCheckLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: true, Locks: nil}}, nil
|
||||
case tikvrpc.CmdRemoveLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.RemoveLockObserverResponse{Error: ""}}, nil
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
alwaysFailHandler := func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
switch req.Type {
|
||||
case tikvrpc.CmdPhysicalScanLock:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: nil, Error: "error"}}, nil
|
||||
case tikvrpc.CmdRegisterLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.RegisterLockObserverResponse{Error: "error"}}, nil
|
||||
case tikvrpc.CmdCheckLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "error", IsClean: false, Locks: nil}}, nil
|
||||
case tikvrpc.CmdRemoveLockObserver:
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.RemoveLockObserverResponse{Error: "error"}}, nil
|
||||
default:
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
reset := func() {
|
||||
s.client.physicalScanLockHandler = alwaysSucceedHanlder
|
||||
s.client.registerLockObserverHandler = alwaysSucceedHanlder
|
||||
s.client.checkLockObserverHandler = alwaysSucceedHanlder
|
||||
s.client.removeLockObserverHandler = alwaysSucceedHanlder
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// No lock
|
||||
reset()
|
||||
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Should return error when fails to register lock observers.
|
||||
reset()
|
||||
s.client.registerLockObserverHandler = alwaysFailHandler
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, ErrorMatches, "register lock observer.*")
|
||||
|
||||
// Should return error when fails to resolve locks.
|
||||
reset()
|
||||
s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
locks := []*kvrpcpb.LockInfo{{Key: []byte{0}}}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.PhysicalScanLockResponse{Locks: locks, Error: ""}}, nil
|
||||
}
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr", "return(100)"), IsNil)
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/resolveLocksAcrossRegionsErr"), IsNil)
|
||||
c.Assert(err, ErrorMatches, "injectedError")
|
||||
|
||||
// Shouldn't return error when fails to scan locks.
|
||||
reset()
|
||||
var returnError uint32 = 1
|
||||
s.client.physicalScanLockHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
if atomic.CompareAndSwapUint32(&returnError, 1, 0) {
|
||||
return alwaysFailHandler(addr, req)
|
||||
}
|
||||
return alwaysSucceedHanlder(addr, req)
|
||||
}
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Should return error if reaches retry limit
|
||||
reset()
|
||||
s.client.physicalScanLockHandler = alwaysFailHandler
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, ErrorMatches, ".*dirty.*")
|
||||
|
||||
// Should return error when one registered store is dirty.
|
||||
reset()
|
||||
s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil
|
||||
}
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, ErrorMatches, "store.*dirty")
|
||||
|
||||
// Should return error when fails to check lock observers.
|
||||
reset()
|
||||
s.client.checkLockObserverHandler = alwaysFailHandler
|
||||
err = s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
// When fails to check lock observer in a store, we assume the store is dirty.
|
||||
c.Assert(err, ErrorMatches, "store.*dirty")
|
||||
|
||||
// Shouldn't return error when the dirty store is newly added.
|
||||
reset()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, IsNil)
|
||||
}()
|
||||
// Sleep to let the goroutine pause.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
s.cluster.AddStore(100, "store100")
|
||||
once := true
|
||||
s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
// The newly added store returns IsClean=false for the first time.
|
||||
if addr == "store100" && once {
|
||||
once = false
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil
|
||||
}
|
||||
return alwaysSucceedHanlder(addr, req)
|
||||
}
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
|
||||
wg.Wait()
|
||||
|
||||
// Shouldn't return error when a store is removed.
|
||||
reset()
|
||||
wg.Add(1)
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, IsNil)
|
||||
}()
|
||||
// Sleep to let the goroutine pause.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
s.cluster.RemoveStore(100)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
|
||||
wg.Wait()
|
||||
|
||||
// Should return error when a cleaned store becomes dirty.
|
||||
reset()
|
||||
wg.Add(1)
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers", "pause"), IsNil)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := s.gcWorker.resolveLocksPhysical(ctx, 10000)
|
||||
c.Assert(err, ErrorMatches, "store.*dirty")
|
||||
}()
|
||||
// Sleep to let the goroutine pause.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
store := s.cluster.GetAllStores()[0]
|
||||
var onceClean uint32 = 1
|
||||
s.cluster.AddStore(100, "store100")
|
||||
var onceDirty uint32 = 1
|
||||
s.client.checkLockObserverHandler = func(addr string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
|
||||
switch addr {
|
||||
case "store100":
|
||||
// The newly added store returns IsClean=false for the first time.
|
||||
if atomic.CompareAndSwapUint32(&onceDirty, 1, 0) {
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil
|
||||
}
|
||||
return alwaysSucceedHanlder(addr, req)
|
||||
case store.Address:
|
||||
// The store returns IsClean=true for the first time.
|
||||
if atomic.CompareAndSwapUint32(&onceClean, 1, 0) {
|
||||
return alwaysSucceedHanlder(addr, req)
|
||||
}
|
||||
return &tikvrpc.Response{Resp: &kvrpcpb.CheckLockObserverResponse{Error: "", IsClean: false, Locks: nil}}, nil
|
||||
default:
|
||||
return alwaysSucceedHanlder(addr, req)
|
||||
}
|
||||
}
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/gcworker/beforeCheckLockObservers"), IsNil)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *testGCWorkerSuite) TestPhyscailScanLockDeadlock(c *C) {
|
||||
ctx := context.Background()
|
||||
stores := s.cluster.GetAllStores()
|
||||
|
||||
Reference in New Issue
Block a user