tikv: use timestamp from TSO instead of MaxVersion in point get retry (#22789)
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
1
go.sum
1
go.sum
@ -463,6 +463,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
|
||||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44 h1:tB9NOR21++IjLyVx3/PCPhWMwqGNCMQEH96A6dMZ/gc=
|
||||
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
|
||||
github.com/shirou/gopsutil v3.20.12+incompatible h1:6VEGkOXP/eP4o2Ilk8cSsX0PhOEfX6leqAnD+urrp9M=
|
||||
|
||||
@ -32,6 +32,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/logutil"
|
||||
"github.com/pingcap/tidb/store/tikv/metrics"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/store/tikv/util"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
@ -78,6 +79,7 @@ type tikvSnapshot struct {
|
||||
taskID uint64
|
||||
}
|
||||
sampleStep uint32
|
||||
txnScope string
|
||||
}
|
||||
|
||||
// newTiKVSnapshot creates a snapshot of an TiKV store.
|
||||
@ -402,6 +404,15 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
|
||||
})
|
||||
|
||||
cli := NewClientHelper(s.store, s.resolvedLocks)
|
||||
|
||||
// Secondary locks or async commit locks cannot be ignored when getting using the max version.
|
||||
// So we concurrently get a TS from PD and use it in retries to avoid unnecessary blocking.
|
||||
var tsFuture oracle.Future
|
||||
if s.version == kv.MaxVersion {
|
||||
tsFuture = s.store.oracle.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.txnScope})
|
||||
}
|
||||
failpoint.Inject("snapshotGetTSAsync", nil)
|
||||
|
||||
s.mu.RLock()
|
||||
if s.mu.stats != nil {
|
||||
cli.Stats = make(map[tikvrpc.CmdType]*RPCRuntimeStats)
|
||||
@ -419,6 +430,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
|
||||
TaskId: s.mu.taskID,
|
||||
})
|
||||
s.mu.RUnlock()
|
||||
|
||||
for {
|
||||
loc, err := s.store.regionCache.LocateKey(bo, k)
|
||||
if err != nil {
|
||||
@ -452,6 +464,20 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if s.version == kv.MaxVersion {
|
||||
newTS, err := tsFuture.Wait()
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
s.version = kv.NewVersion(newTS)
|
||||
req.Req.(*pb.GetRequest).Version = newTS
|
||||
// skip lock resolving and backoff if the lock does not block the read
|
||||
if newTS < lock.TxnID || newTS < lock.MinCommitTS {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
msBeforeExpired, err := cli.ResolveLocks(bo, s.version.Ver, []*Lock{lock})
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
@ -526,6 +552,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
|
||||
s.mu.Unlock()
|
||||
case kv.SampleStep:
|
||||
s.sampleStep = val.(uint32)
|
||||
case kv.TxnScope:
|
||||
s.txnScope = val.(string)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -118,3 +118,33 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError(c *C) {
|
||||
c.Assert(iter.Valid(), IsFalse)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/rpcScanResult"), IsNil)
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TestRetryPointGetWithTS(c *C) {
|
||||
snapshot := s.store.GetSnapshot(kv.MaxVersion)
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync", `pause`), IsNil)
|
||||
ch := make(chan error)
|
||||
go func() {
|
||||
_, err := snapshot.Get(context.Background(), []byte("k4"))
|
||||
ch <- err
|
||||
}()
|
||||
|
||||
txn, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
err = txn.Set([]byte("k4"), []byte("v4"))
|
||||
c.Assert(err, IsNil)
|
||||
txn.SetOption(kv.EnableAsyncCommit, true)
|
||||
txn.SetOption(kv.GuaranteeLinearizability, false)
|
||||
// Prewrite an async-commit lock and do not commit it.
|
||||
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing", `return`), IsNil)
|
||||
committer, err := newTwoPhaseCommitterWithInit(txn.(*tikvTxn), 1)
|
||||
c.Assert(err, IsNil)
|
||||
// Sets its minCommitTS to a large value, so the lock can be actually ignored.
|
||||
committer.minCommitTS = committer.startTS + (1 << 28)
|
||||
err = committer.execute(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/asyncCommitDoNothing"), IsNil)
|
||||
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshotGetTSAsync"), IsNil)
|
||||
|
||||
err = <-ch
|
||||
c.Assert(err, ErrorMatches, ".*key not exist")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user