From de8be25b6bdf62a5bf20a3d77a996483ec409ae3 Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Thu, 30 Apr 2020 12:11:27 +0800 Subject: [PATCH] gc_worker: compare locks by key and timestamp (#16536) --- store/tikv/gcworker/gc_worker.go | 24 +++--- store/tikv/gcworker/gc_worker_test.go | 109 ++++++++++++++++++-------- 2 files changed, 87 insertions(+), 46 deletions(-) diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index fb12abae79..c044d3eb95 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -1805,12 +1805,12 @@ const scanLockResultBufferSize = 128 // mergeLockScanner is used to scan specified stores by using PhysicalScanLock. For multiple stores, the scanner will // merge the scan results of each store, and remove the duplicating items from different stores. type mergeLockScanner struct { - safePoint uint64 - client tikv.Client - stores map[uint64]*metapb.Store - receivers mergeReceiver - currentLockKey []byte - scanLockLimit uint32 + safePoint uint64 + client tikv.Client + stores map[uint64]*metapb.Store + receivers mergeReceiver + currentLock *tikv.Lock + scanLockLimit uint32 } type receiver struct { @@ -1858,8 +1858,8 @@ func (r mergeReceiver) Less(i, j int) bool { // lhs != nil, so lhs < rhs return true } - - return bytes.Compare(lhs.Key, rhs.Key) < 0 + ord := bytes.Compare(lhs.Key, rhs.Key) + return ord < 0 || (ord == 0 && lhs.TxnID < rhs.TxnID) } func (r mergeReceiver) Swap(i, j int) { @@ -1929,15 +1929,15 @@ func (s *mergeLockScanner) startWithReceivers(receivers []*receiver) { func (s *mergeLockScanner) Next() *tikv.Lock { for { - nextReceiver := heap.Pop(&s.receivers).(*receiver) + nextReceiver := s.receivers[0] nextLock := nextReceiver.TakeNextLock() - heap.Push(&s.receivers, nextReceiver) + heap.Fix(&s.receivers, 0) if nextLock == nil { return nil } - if s.currentLockKey == nil || !bytes.Equal(s.currentLockKey, nextLock.Key) { - s.currentLockKey = nextLock.Key + if s.currentLock == nil || !bytes.Equal(s.currentLock.Key, nextLock.Key) || s.currentLock.TxnID != nextLock.TxnID { + s.currentLock = nextLock return nextLock } } diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 48fc76ad9b..64c94cae2e 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -927,6 +927,7 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul scanner := &mergeLockScanner{} channels := make([]chan scanLockResult, 0, count) receivers := make([]*receiver, 0, count) + storeIDs := make([]uint64, 0, count) for i := 0; i < count; i++ { ch := make(chan scanLockResult, 10) @@ -937,6 +938,7 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul channels = append(channels, ch) receivers = append(receivers, receiver) + storeIDs = append(storeIDs, uint64(i)) } resultCh := make(chan []*tikv.Lock) @@ -949,11 +951,6 @@ func makeMergedChannel(c *C, count int) (*mergeLockScanner, []chan scanLockResul resultCh <- result }() - storeIDs := make([]uint64, count) - for i := 0; i < count; i++ { - storeIDs[i] = uint64(i) - } - return scanner, channels, storeIDs, resultCh } @@ -994,7 +991,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca locks = nil break } - lockInfo := &kvrpcpb.LockInfo{Key: res.Lock.Key} + lockInfo := &kvrpcpb.LockInfo{Key: res.Lock.Key, LockVersion: res.Lock.TxnID} locks = append(locks, lockInfo) } @@ -1035,53 +1032,86 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { return res } - makeLock := func(key string) *tikv.Lock { - return &tikv.Lock{Key: []byte(key)} + makeLock := func(key string, ts uint64) *tikv.Lock { + return &tikv.Lock{Key: []byte(key), TxnID: ts} } - makeLockList := func(keys ...string) []*tikv.Lock { - res := make([]*tikv.Lock, 0, len(keys)) - for _, k := range keys { - res = append(res, makeLock(k)) + makeLockList := func(locks ...*tikv.Lock) []*tikv.Lock { + res := make([]*tikv.Lock, 0, len(locks)) + for _, lock := range locks { + res = append(res, lock) } return res } - sendLocks := func(ch chan<- scanLockResult, keys ...string) { - for _, k := range keys { - ch <- scanLockResult{Lock: makeLock(k)} + makeLockListByKey := func(keys ...string) []*tikv.Lock { + res := make([]*tikv.Lock, 0, len(keys)) + for _, key := range keys { + res = append(res, makeLock(key, 0)) } + return res + } + + sendLocks := func(ch chan<- scanLockResult, locks ...*tikv.Lock) { + for _, lock := range locks { + ch <- scanLockResult{Lock: lock} + } + } + + sendLocksByKey := func(ch chan<- scanLockResult, keys ...string) []*tikv.Lock { + locks := make([]*tikv.Lock, 0, len(keys)) + for _, key := range keys { + locks = append(locks, makeLock(key, 0)) + } + sendLocks(ch, locks...) + return locks } sendErr := func(ch chan<- scanLockResult) { ch <- scanLockResult{Err: errors.New("error")} } + // No lock. scanner, sendCh, storeIDs, resCh := makeMergedChannel(c, 1) close(sendCh[0]) c.Assert(len(<-resCh), Equals, 0) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0)) scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1) - sendLocks(sendCh[0], "a", "b", "c") + locks := sendLocksByKey(sendCh[0], "a", "b", "c") close(sendCh[0]) - c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c")) + c.Assert(<-resCh, DeepEquals, locks) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0)) + // Send locks with error scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 1) - sendLocks(sendCh[0], "a", "b", "c") + locks = sendLocksByKey(sendCh[0], "a", "b", "c") sendErr(sendCh[0]) close(sendCh[0]) - c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c")) + c.Assert(<-resCh, DeepEquals, locks) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs)) + // Merge sort locks with different keys. scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2) - sendLocks(sendCh[0], "a", "c", "e") + locks = sendLocksByKey(sendCh[0], "a", "c", "e") time.Sleep(time.Millisecond * 100) - sendLocks(sendCh[1], "b", "d", "f") + locks = append(locks, sendLocksByKey(sendCh[1], "b", "d", "f")...) close(sendCh[0]) close(sendCh[1]) - c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f")) + sort.Slice(locks, func(i, j int) bool { + return bytes.Compare(locks[i].Key, locks[j].Key) < 0 + }) + c.Assert(<-resCh, DeepEquals, locks) + c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1)) + + // Merge sort locks with different timestamps. + scanner, sendCh, storeIDs, resCh = makeMergedChannel(c, 2) + sendLocks(sendCh[0], makeLock("a", 0), makeLock("a", 1)) + time.Sleep(time.Millisecond * 100) + sendLocks(sendCh[1], makeLock("a", 1), makeLock("a", 2), makeLock("b", 0)) + close(sendCh[0]) + close(sendCh[1]) + c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("a", 2), makeLock("b", 0))) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1)) for _, useMock := range []bool{false, true} { @@ -1091,38 +1121,49 @@ func (s *testGCWorkerSuite) TestMergeLockScanner(c *C) { } scanner, sendCh, storeIDs, resCh = channel(c, 3) - sendLocks(sendCh[0], "a", "d", "g", "h") + sendLocksByKey(sendCh[0], "a", "d", "g", "h") time.Sleep(time.Millisecond * 100) - sendLocks(sendCh[1], "a", "d", "f", "h") + sendLocksByKey(sendCh[1], "a", "d", "f", "h") time.Sleep(time.Millisecond * 100) - sendLocks(sendCh[2], "b", "c", "e", "h") + sendLocksByKey(sendCh[2], "b", "c", "e", "h") close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f", "g", "h")) + c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h")) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) scanner, sendCh, storeIDs, resCh = channel(c, 3) - sendLocks(sendCh[0], "a", "d", "g", "h") + sendLocksByKey(sendCh[0], "a", "d", "g", "h") time.Sleep(time.Millisecond * 100) - sendLocks(sendCh[1], "a", "d", "f", "h") + sendLocksByKey(sendCh[1], "a", "d", "f", "h") time.Sleep(time.Millisecond * 100) - sendLocks(sendCh[2], "b", "c", "e", "h") + sendLocksByKey(sendCh[2], "b", "c", "e", "h") sendErr(sendCh[0]) close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockList("a", "b", "c", "d", "e", "f", "g", "h")) + c.Assert(<-resCh, DeepEquals, makeLockListByKey("a", "b", "c", "d", "e", "f", "g", "h")) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 1, 2)) scanner, sendCh, storeIDs, resCh = channel(c, 3) - sendLocks(sendCh[0], "a\x00", "a\x00\x00", "b", "b\x00") - sendLocks(sendCh[1], "a", "a\x00\x00", "a\x00\x00\x00", "c") - sendLocks(sendCh[2], "1", "a\x00", "a\x00\x00", "b") + sendLocksByKey(sendCh[0], "a\x00", "a\x00\x00", "b", "b\x00") + sendLocksByKey(sendCh[1], "a", "a\x00\x00", "a\x00\x00\x00", "c") + sendLocksByKey(sendCh[2], "1", "a\x00", "a\x00\x00", "b") close(sendCh[0]) close(sendCh[1]) close(sendCh[2]) - c.Assert(<-resCh, DeepEquals, makeLockList("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c")) + c.Assert(<-resCh, DeepEquals, makeLockListByKey("1", "a", "a\x00", "a\x00\x00", "a\x00\x00\x00", "b", "b\x00", "c")) + c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) + + scanner, sendCh, storeIDs, resCh = channel(c, 3) + sendLocks(sendCh[0], makeLock("a", 0), makeLock("d", 0), makeLock("g", 0), makeLock("h", 0)) + sendLocks(sendCh[1], makeLock("a", 1), makeLock("b", 0), makeLock("c", 0), makeLock("d", 1)) + sendLocks(sendCh[2], makeLock("e", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0)) + close(sendCh[0]) + close(sendCh[1]) + close(sendCh[2]) + c.Assert(<-resCh, DeepEquals, makeLockList(makeLock("a", 0), makeLock("a", 1), makeLock("b", 0), makeLock("c", 0), + makeLock("d", 0), makeLock("d", 1), makeLock("e", 0), makeLock("g", 0), makeLock("g", 1), makeLock("g", 2), makeLock("h", 0))) c.Assert(scanner.GetSucceededStores(), DeepEquals, makeIDSet(storeIDs, 0, 1, 2)) } }