From 2bbec477baec6d538847403dd67d862f4abda979 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 22 Feb 2021 11:22:46 +0800 Subject: [PATCH] tikv: use timestamp from TSO instead of MaxVersion in point get retry (#22789) Signed-off-by: Yilin Chen --- go.sum | 1 + store/tikv/snapshot.go | 28 ++++++++++++++++++++++++++++ store/tikv/snapshot_fail_test.go | 30 ++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/go.sum b/go.sum index fe555c57f8..d363a0a12c 100644 --- a/go.sum +++ b/go.sum @@ -463,6 +463,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.20.12+incompatible h1:6VEGkOXP/eP4o2Ilk8cSsX0PhOEfX6leqAnD+urrp9M= diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 2f6f3015d2..21c3000a45 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/tablecodec" @@ -78,6 +79,7 @@ type tikvSnapshot struct { taskID uint64 } sampleStep uint32 + txnScope string } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -402,6 +404,15 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte }) cli := NewClientHelper(s.store, s.resolvedLocks) + + // Secondary locks or async commit locks cannot be ignored when getting using the max version. + // So we concurrently get a TS from PD and use it in retries to avoid unnecessary blocking. + var tsFuture oracle.Future + if s.version == kv.MaxVersion { + tsFuture = s.store.oracle.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.txnScope}) + } + failpoint.Inject("snapshotGetTSAsync", nil) + s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats) @@ -419,6 +430,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte TaskId: s.mu.taskID, }) s.mu.RUnlock() + for { loc, err := s.store.regionCache.LocateKey(bo, k) if err != nil { @@ -452,6 +464,20 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte if err != nil { return nil, errors.Trace(err) } + + if s.version == kv.MaxVersion { + newTS, err := tsFuture.Wait() + if err != nil { + return nil, errors.Trace(err) + } + s.version = kv.NewVersion(newTS) + req.Req.(*pb.GetRequest).Version = newTS + // skip lock resolving and backoff if the lock does not block the read + if newTS < lock.TxnID || newTS < lock.MinCommitTS { + continue + } + } + msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock}) if err != nil { return nil, errors.Trace(err) @@ -526,6 +552,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { s.mu.Unlock() case kv.SampleStep: s.sampleStep = val.(uint32) + case kv.TxnScope: + s.txnScope = val.(string) } } diff --git a/store/tikv/snapshot_fail_test.go b/store/tikv/snapshot_fail_test.go index cfedcad6f5..b76b6ccf8f 100644 --- a/store/tikv/snapshot_fail_test.go +++ b/store/tikv/snapshot_fail_test.go @@ -118,3 +118,33 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) { c.Assert(iter.Valid(), IsFalse) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcScanResult"), IsNil) } + +func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) { + snapshot := s.store.GetSnapshot(kv.MaxVersion) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil) + ch := make(chan error) + go func() { + _, err := snapshot.Get(context.Background(), []byte("k4")) + ch <- err + }() + + txn, err := s.store.Begin() + c.Assert(err, IsNil) + err = txn.Set([]byte("k4"), []byte("v4")) + c.Assert(err, IsNil) + txn.SetOption(kv.EnableAsyncCommit, true) + txn.SetOption(kv.GuaranteeLinearizability, false) + // Prewrite an async-commit lock and do not commit it. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil) + committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 1) + c.Assert(err, IsNil) + // Sets its minCommitTS to a large value, so the lock can be actually ignored. + committer.minCommitTS = committer.startTS + (1 << 28) + err = committer.execute(context.Background()) + c.Assert(err, IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil) + + err = <-ch + c.Assert(err, ErrorMatches, ".*key not exist") +}