gc_worker: compare locks by key and timestamp (#16536)

This commit is contained in:
Lei Zhao
2020-04-30 12:11:27 +08:00
committed by GitHub
parent 556e43eef7
commit de8be25b6b
2 changed files with 87 additions and 46 deletions

View File

@ -1805,12 +1805,12 @@ const scanLockResultBufferSize = 128
// mergeLockScanner is used to scan specified stores by using PhysicalScanLock. For multiple stores, the scanner will
// merge the scan results of each store, and remove the duplicating items from different stores.
type mergeLockScanner struct {
safePoint uint64
client tikv.Client
stores map[uint64]*metapb.Store
receivers mergeReceiver
currentLockKey []byte
scanLockLimit uint32
safePoint uint64
client tikv.Client
stores map[uint64]*metapb.Store
receivers mergeReceiver
currentLock *tikv.Lock
scanLockLimit uint32
}
type receiver struct {
@ -1858,8 +1858,8 @@ func (r mergeReceiver) Less(i, j int) bool {
// lhs != nil, so lhs < rhs
return true
}
return bytes.Compare(lhs.Key, rhs.Key) < 0
ord := bytes.Compare(lhs.Key, rhs.Key)
return ord < 0 || (ord == 0 && lhs.TxnID < rhs.TxnID)
}
func (r mergeReceiver) Swap(i, j int) {
@ -1929,15 +1929,15 @@ func (s *mergeLockScanner) startWithReceivers(receivers []*receiver) {
func (s *mergeLockScanner) Next() *tikv.Lock {
for {
nextReceiver := heap.Pop(&s.receivers).(*receiver)
nextReceiver := s.receivers[0]
nextLock := nextReceiver.TakeNextLock()
heap.Push(&s.receivers, nextReceiver)
heap.Fix(&s.receivers, 0)
if nextLock == nil {
return nil
}
if s.currentLockKey == nil || !bytes.Equal(s.currentLockKey, nextLock.Key) {
s.currentLockKey = nextLock.Key
if s.currentLock == nil || !bytes.Equal(s.currentLock.Key, nextLock.Key) || s.currentLock.TxnID != nextLock.TxnID {
s.currentLock = nextLock
return nextLock
}
}

View File

@ -927,6 +927,7 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul
scanner := &mergeLockScanner{}
channels := make([]chan scanLockResult, 0, count)
receivers := make([]*receiver, 0, count)
storeIDs := make([]uint64, 0, count)
for i := 0; i < count; i++ {
ch := make(chan scanLockResult, 10)
@ -937,6 +938,7 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul
channels = append(channels, ch)
receivers = append(receivers, receiver)
storeIDs = append(storeIDs, uint64(i))
}
resultCh := make(chan []*tikv.Lock)
@ -949,11 +951,6 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul
resultCh <- result
}()
storeIDs := make([]uint64, count)
for i := 0; i < count; i++ {
storeIDs[i] = uint64(i)
}
return scanner, channels, storeIDs, resultCh
}
@ -994,7 +991,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca
locks = nil
break
}
lockInfo := &kvrpcpb.LockInfo{Key: res.Lock.Key}
lockInfo := &kvrpcpb.LockInfo{Key: res.Lock.Key, LockVersion: res.Lock.TxnID}
locks = append(locks, lockInfo)
}
@ -1035,53 +1032,86 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) {
return res
}
makeLock := func(key string) *tikv.Lock {
return &tikv.Lock{Key: []byte(key)}
makeLock := func(key string, ts uint64) *tikv.Lock {
return &tikv.Lock{Key: []byte(key), TxnID: ts}
}
makeLockList := func(keys ...string) []*tikv.Lock {
res := make([]*tikv.Lock, 0, len(keys))
for _, k := range keys {
res = append(res, makeLock(k))
makeLockList := func(locks ...*tikv.Lock) []*tikv.Lock {
res := make([]*tikv.Lock, 0, len(locks))
for _, lock := range locks {
res = append(res, lock)
}
return res
}
sendLocks := func(ch chan<- scanLockResult, keys ...string) {
for _, k := range keys {
ch <- scanLockResult{Lock: makeLock(k)}
makeLockListByKey := func(keys ...string) []*tikv.Lock {
res := make([]*tikv.Lock, 0, len(keys))
for _, key := range keys {
res = append(res, makeLock(key, 0))
}
return res
}
sendLocks := func(ch chan<- scanLockResult, locks ...*tikv.Lock) {
for _, lock := range locks {
ch <- scanLockResult{Lock: lock}
}
}
sendLocksByKey := func(ch chan<- scanLockResult, keys ...string) []*tikv.Lock {
locks := make([]*tikv.Lock, 0, len(keys))
for _, key := range keys {
locks = append(locks, makeLock(key, 0))
}
sendLocks(ch, locks...)
return locks
}
sendErr := func(ch chan<- scanLockResult) {
ch <- scanLockResult{Err: errors.New("error")}
}
// No lock.
scanner, sendCh, storeIDs, resCh := makeMergedChannel(c, 1)
close(sendCh[0])
c.Assert(len(<-resCh), Equals, 0)
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0))
scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1)
sendLocks(sendCh[0], "a", "b", "c")
locks := sendLocksByKey(sendCh[0], "a", "b", "c")
close(sendCh[0])
c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c"))
c.Assert(<-resCh, DeepEquals, locks)
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0))
// Send locks with error
scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1)
sendLocks(sendCh[0], "a", "b", "c")
locks = sendLocksByKey(sendCh[0], "a", "b", "c")
sendErr(sendCh[0])
close(sendCh[0])
c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c"))
c.Assert(<-resCh, DeepEquals, locks)
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs))
// Merge sort locks with different keys.
scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2)
sendLocks(sendCh[0], "a", "c", "e")
locks = sendLocksByKey(sendCh[0], "a", "c", "e")
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[1], "b", "d", "f")
locks = append(locks, sendLocksByKey(sendCh[1], "b", "d", "f")...)
close(sendCh[0])
close(sendCh[1])
c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f"))
sort.Slice(locks, func(i, j int) bool {
return bytes.Compare(locks[i].Key, locks[j].Key) < 0
})
c.Assert(<-resCh, DeepEquals, locks)
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1))
// Merge sort locks with different timestamps.
scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2)
sendLocks(sendCh[0], makeLock("a", 0), makeLock("a", 1))
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[1], makeLock("a", 1), makeLock("a", 2), makeLock("b", 0))
close(sendCh[0])
close(sendCh[1])
c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("a", 2), makeLock("b", 0)))
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1))
for _, useMock := range []bool{false, true} {
@ -1091,38 +1121,49 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) {
}
scanner, sendCh, storeIDs, resCh = channel(c, 3)
sendLocks(sendCh[0], "a", "d", "g", "h")
sendLocksByKey(sendCh[0], "a", "d", "g", "h")
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[1], "a", "d", "f", "h")
sendLocksByKey(sendCh[1], "a", "d", "f", "h")
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[2], "b", "c", "e", "h")
sendLocksByKey(sendCh[2], "b", "c", "e", "h")
close(sendCh[0])
close(sendCh[1])
close(sendCh[2])
c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f", "g", "h"))
c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h"))
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2))
scanner, sendCh, storeIDs, resCh = channel(c, 3)
sendLocks(sendCh[0], "a", "d", "g", "h")
sendLocksByKey(sendCh[0], "a", "d", "g", "h")
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[1], "a", "d", "f", "h")
sendLocksByKey(sendCh[1], "a", "d", "f", "h")
time.Sleep(time.Millisecond * 100)
sendLocks(sendCh[2], "b", "c", "e", "h")
sendLocksByKey(sendCh[2], "b", "c", "e", "h")
sendErr(sendCh[0])
close(sendCh[0])
close(sendCh[1])
close(sendCh[2])
c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f", "g", "h"))
c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h"))
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 1, 2))
scanner, sendCh, storeIDs, resCh = channel(c, 3)
sendLocks(sendCh[0], "a\x00", "a\x00\x00", "b", "b\x00")
sendLocks(sendCh[1], "a", "a\x00\x00", "a\x00\x00\x00", "c")
sendLocks(sendCh[2], "1", "a\x00", "a\x00\x00", "b")
sendLocksByKey(sendCh[0], "a\x00", "a\x00\x00", "b", "b\x00")
sendLocksByKey(sendCh[1], "a", "a\x00\x00", "a\x00\x00\x00", "c")
sendLocksByKey(sendCh[2], "1", "a\x00", "a\x00\x00", "b")
close(sendCh[0])
close(sendCh[1])
close(sendCh[2])
c.Assert(<-resCh, DeepEquals, makeLockList("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c"))
c.Assert(<-resCh, DeepEquals, makeLockListByKey("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c"))
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2))
scanner, sendCh, storeIDs, resCh = channel(c, 3)
sendLocks(sendCh[0], makeLock("a", 0), makeLock("d", 0), makeLock("g", 0), makeLock("h", 0))
sendLocks(sendCh[1], makeLock("a", 1), makeLock("b", 0), makeLock("c", 0), makeLock("d", 1))
sendLocks(sendCh[2], makeLock("e", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0))
close(sendCh[0])
close(sendCh[1])
close(sendCh[2])
c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("b", 0), makeLock("c", 0),
makeLock("d", 0), makeLock("d", 1), makeLock("e", 0), makeLock("g", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0)))
c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2))
}
}