executor, store: Pass the SQL digest down to pessimistic lock request (#24380)
This commit is contained in:
@ -62,6 +62,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/execdetails"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/memory"
|
||||
"github.com/pingcap/tidb/util/resourcegrouptag"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -971,6 +972,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
|
||||
}
|
||||
|
||||
func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
|
||||
_, sqlDigest := seVars.StmtCtx.SQLDigest()
|
||||
return &tikvstore.LockCtx{
|
||||
Killed: &seVars.Killed,
|
||||
ForUpdateTS: seVars.TxnCtx.GetForUpdateTS(),
|
||||
@ -980,6 +982,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
|
||||
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
|
||||
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
|
||||
LockExpired: &seVars.TxnCtx.LockExpire,
|
||||
ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(sqlDigest),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -44,7 +44,10 @@ type DetectorServer struct {
|
||||
func (ds *DetectorServer) Detect(req *deadlockPb.DeadlockRequest) *deadlockPb.DeadlockResponse {
|
||||
switch req.Tp {
|
||||
case deadlockPb.DeadlockRequestType_Detect:
|
||||
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
|
||||
err := ds.Detector.Detect(req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash, diagnosticContext{
|
||||
key: req.Entry.Key,
|
||||
resourceGroupTag: req.Entry.ResourceGroupTag,
|
||||
})
|
||||
if err != nil {
|
||||
resp := convertErrToResp(err, req.Entry.Txn, req.Entry.WaitForTxn, req.Entry.KeyHash)
|
||||
return resp
|
||||
@ -178,30 +181,35 @@ func (dt *DetectorClient) recvLoop(streamCli deadlockPb.Deadlock_DetectClient) {
|
||||
}
|
||||
|
||||
func (dt *DetectorClient) handleRemoteTask(requestType deadlockPb.DeadlockRequestType,
|
||||
txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
|
||||
txnTs uint64, waitForTxnTs uint64, keyHash uint64, diagCtx diagnosticContext) {
|
||||
detectReq := &deadlockPb.DeadlockRequest{}
|
||||
detectReq.Tp = requestType
|
||||
detectReq.Entry.Txn = txnTs
|
||||
detectReq.Entry.WaitForTxn = waitForTxnTs
|
||||
detectReq.Entry.KeyHash = keyHash
|
||||
detectReq.Entry.Key = diagCtx.key
|
||||
detectReq.Entry.ResourceGroupTag = diagCtx.resourceGroupTag
|
||||
dt.sendCh <- detectReq
|
||||
}
|
||||
|
||||
// CleanUp processes cleaup task on local detector
|
||||
// user interfaces
|
||||
func (dt *DetectorClient) CleanUp(startTs uint64) {
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0)
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUp, startTs, 0, 0, diagnosticContext{})
|
||||
}
|
||||
|
||||
// CleanUpWaitFor cleans up the specific wait edge in detector's wait map
|
||||
func (dt *DetectorClient) CleanUpWaitFor(txnTs, waitForTxn, keyHash uint64) {
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash)
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_CleanUpWaitFor, txnTs, waitForTxn, keyHash, diagnosticContext{})
|
||||
}
|
||||
|
||||
// Detect post the detection request to local deadlock detector or remote first region leader,
|
||||
// the caller should use `waiter.ch` to receive possible deadlock response
|
||||
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64) {
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash)
|
||||
func (dt *DetectorClient) Detect(txnTs uint64, waitForTxnTs uint64, keyHash uint64, key []byte, resourceGroupTag []byte) {
|
||||
dt.handleRemoteTask(deadlockPb.DeadlockRequestType_Detect, txnTs, waitForTxnTs, keyHash, diagnosticContext{
|
||||
key: key,
|
||||
resourceGroupTag: resourceGroupTag,
|
||||
})
|
||||
}
|
||||
|
||||
// convertErrToResp converts `ErrDeadlock` to `DeadlockResponse` proto type
|
||||
@ -213,6 +221,18 @@ func convertErrToResp(errDeadlock *ErrDeadlock, txnTs, waitForTxnTs, keyHash uin
|
||||
resp := &deadlockPb.DeadlockResponse{}
|
||||
resp.Entry = entry
|
||||
resp.DeadlockKeyHash = errDeadlock.DeadlockKeyHash
|
||||
|
||||
resp.WaitChain = make([]*deadlockPb.WaitForEntry, 0, len(errDeadlock.WaitChain))
|
||||
for _, item := range errDeadlock.WaitChain {
|
||||
resp.WaitChain = append(resp.WaitChain, &deadlockPb.WaitForEntry{
|
||||
Txn: item.Txn,
|
||||
WaitForTxn: item.WaitForTxn,
|
||||
KeyHash: item.KeyHash,
|
||||
Key: item.Key,
|
||||
ResourceGroupTag: item.ResourceGroupTag,
|
||||
})
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
"github.com/pingcap/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -54,6 +55,12 @@ type txnKeyHashPair struct {
|
||||
txn uint64
|
||||
keyHash uint64
|
||||
registerTime time.Time
|
||||
diagCtx diagnosticContext
|
||||
}
|
||||
|
||||
type diagnosticContext struct {
|
||||
key []byte
|
||||
resourceGroupTag []byte
|
||||
}
|
||||
|
||||
func (p *txnKeyHashPair) isExpired(ttl time.Duration, nowTime time.Time) bool {
|
||||
@ -75,13 +82,27 @@ func NewDetector(ttl time.Duration, urgentSize uint64, expireInterval time.Durat
|
||||
}
|
||||
|
||||
// Detect detects deadlock for the sourceTxn on a locked key.
|
||||
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64) *ErrDeadlock {
|
||||
func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) *ErrDeadlock {
|
||||
d.lock.Lock()
|
||||
nowTime := time.Now()
|
||||
d.activeExpire(nowTime)
|
||||
err := d.doDetect(nowTime, sourceTxn, waitForTxn)
|
||||
if err == nil {
|
||||
d.register(sourceTxn, waitForTxn, keyHash)
|
||||
d.register(sourceTxn, waitForTxn, keyHash, diagCtx)
|
||||
} else {
|
||||
// Reverse the wait chain so that the order will be each one waiting for the next one, and append the current
|
||||
// entry that finally caused the deadlock.
|
||||
for i := 0; i < len(err.WaitChain)/2; i++ {
|
||||
j := len(err.WaitChain) - i - 1
|
||||
err.WaitChain[i], err.WaitChain[j] = err.WaitChain[j], err.WaitChain[i]
|
||||
}
|
||||
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
|
||||
Txn: sourceTxn,
|
||||
Key: diagCtx.key,
|
||||
KeyHash: keyHash,
|
||||
ResourceGroupTag: diagCtx.resourceGroupTag,
|
||||
WaitForTxn: waitForTxn,
|
||||
})
|
||||
}
|
||||
d.lock.Unlock()
|
||||
return err
|
||||
@ -103,9 +124,26 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
|
||||
continue
|
||||
}
|
||||
if keyHashPair.txn == sourceTxn {
|
||||
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash}
|
||||
return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash,
|
||||
WaitChain: []*deadlockPB.WaitForEntry{
|
||||
{
|
||||
Txn: waitForTxn,
|
||||
Key: keyHashPair.diagCtx.key,
|
||||
KeyHash: keyHashPair.keyHash,
|
||||
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
|
||||
WaitForTxn: keyHashPair.txn,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil {
|
||||
err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{
|
||||
Txn: waitForTxn,
|
||||
Key: keyHashPair.diagCtx.key,
|
||||
KeyHash: keyHashPair.keyHash,
|
||||
ResourceGroupTag: keyHashPair.diagCtx.resourceGroupTag,
|
||||
WaitForTxn: keyHashPair.txn,
|
||||
})
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -115,9 +153,9 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64) {
|
||||
func (d *Detector) register(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnosticContext) {
|
||||
val := d.waitForMap[sourceTxn]
|
||||
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now()}
|
||||
pair := txnKeyHashPair{txn: waitForTxn, keyHash: keyHash, registerTime: time.Now(), diagCtx: diagCtx}
|
||||
if val == nil {
|
||||
newList := &txnList{txns: list.New()}
|
||||
newList.txns.PushBack(&pair)
|
||||
|
||||
@ -31,6 +31,7 @@ import (
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
)
|
||||
|
||||
func TestT(t *testing.T) {
|
||||
@ -42,19 +43,38 @@ var _ = Suite(&testDeadlockSuite{})
|
||||
type testDeadlockSuite struct{}
|
||||
|
||||
func (s *testDeadlockSuite) TestDeadlock(c *C) {
|
||||
makeDiagCtx := func(key string, resourceGroupTag string) diagnosticContext {
|
||||
return diagnosticContext{
|
||||
key: []byte(key),
|
||||
resourceGroupTag: []byte(resourceGroupTag),
|
||||
}
|
||||
}
|
||||
checkWaitChainEntry := func(entry *deadlockPB.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) {
|
||||
c.Assert(entry.Txn, Equals, txn)
|
||||
c.Assert(entry.WaitForTxn, Equals, waitForTxn)
|
||||
c.Assert(string(entry.Key), Equals, key)
|
||||
c.Assert(string(entry.ResourceGroupTag), Equals, resourceGroupTag)
|
||||
}
|
||||
|
||||
ttl := 50 * time.Millisecond
|
||||
expireInterval := 100 * time.Millisecond
|
||||
urgentSize := uint64(1)
|
||||
detector := NewDetector(ttl, urgentSize, expireInterval)
|
||||
err := detector.Detect(1, 2, 100)
|
||||
err := detector.Detect(1, 2, 100, makeDiagCtx("k1", "tag1"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(detector.totalSize, Equals, uint64(1))
|
||||
err = detector.Detect(2, 3, 200)
|
||||
err = detector.Detect(2, 3, 200, makeDiagCtx("k2", "tag2"))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(detector.totalSize, Equals, uint64(2))
|
||||
err = detector.Detect(3, 1, 300)
|
||||
err = detector.Detect(3, 1, 300, makeDiagCtx("k3", "tag3"))
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, fmt.Sprintf("deadlock"))
|
||||
c.Assert(len(err.WaitChain), Equals, 3)
|
||||
// The order of entries in the wait chain is specific: each item is waiting for the next one.
|
||||
checkWaitChainEntry(err.WaitChain[0], 1, 2, "k1", "tag1")
|
||||
checkWaitChainEntry(err.WaitChain[1], 2, 3, "k2", "tag2")
|
||||
checkWaitChainEntry(err.WaitChain[2], 3, 1, "k3", "tag3")
|
||||
|
||||
c.Assert(detector.totalSize, Equals, uint64(2))
|
||||
detector.CleanUp(2)
|
||||
list2 := detector.waitForMap[2]
|
||||
@ -62,20 +82,21 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) {
|
||||
c.Assert(detector.totalSize, Equals, uint64(1))
|
||||
|
||||
// After cycle is broken, no deadlock now.
|
||||
err = detector.Detect(3, 1, 300)
|
||||
diagCtx := diagnosticContext{}
|
||||
err = detector.Detect(3, 1, 300, diagCtx)
|
||||
c.Assert(err, IsNil)
|
||||
list3 := detector.waitForMap[3]
|
||||
c.Assert(list3.txns.Len(), Equals, 1)
|
||||
c.Assert(detector.totalSize, Equals, uint64(2))
|
||||
|
||||
// Different keyHash grows the list.
|
||||
err = detector.Detect(3, 1, 400)
|
||||
err = detector.Detect(3, 1, 400, diagCtx)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(list3.txns.Len(), Equals, 2)
|
||||
c.Assert(detector.totalSize, Equals, uint64(3))
|
||||
|
||||
// Same waitFor and key hash doesn't grow the list.
|
||||
err = detector.Detect(3, 1, 400)
|
||||
err = detector.Detect(3, 1, 400, diagCtx)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(list3.txns.Len(), Equals, 2)
|
||||
c.Assert(detector.totalSize, Equals, uint64(3))
|
||||
@ -90,7 +111,7 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) {
|
||||
|
||||
// after 100ms, all entries expired, detect non exist edges
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
err = detector.Detect(100, 200, 100)
|
||||
err = detector.Detect(100, 200, 100, diagCtx)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(detector.totalSize, Equals, uint64(1))
|
||||
c.Assert(len(detector.waitForMap), Equals, 1)
|
||||
@ -98,7 +119,7 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) {
|
||||
// expired entry should not report deadlock, detect will remove this entry
|
||||
// not dependent on expire check interval
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
err = detector.Detect(200, 100, 200)
|
||||
err = detector.Detect(200, 100, 200, diagCtx)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(detector.totalSize, Equals, uint64(1))
|
||||
c.Assert(len(detector.waitForMap), Equals, 1)
|
||||
|
||||
@ -16,6 +16,7 @@ package tikv
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc"
|
||||
)
|
||||
@ -90,6 +91,7 @@ type ErrDeadlock struct {
|
||||
LockKey []byte
|
||||
LockTS uint64
|
||||
DeadlockKeyHash uint64
|
||||
WaitChain []*deadlockPB.WaitForEntry
|
||||
}
|
||||
|
||||
func (e ErrDeadlock) Error() string {
|
||||
|
||||
@ -239,7 +239,11 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
|
||||
for _, m := range mutations {
|
||||
lock, err := store.checkConflictInLockStore(reqCtx, m, startTS)
|
||||
if err != nil {
|
||||
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout)
|
||||
var resourceGroupTag []byte = nil
|
||||
if req.Context != nil {
|
||||
resourceGroupTag = req.Context.ResourceGroupTag
|
||||
}
|
||||
return store.handleCheckPessimisticErr(startTS, err, req.IsFirstLock, req.WaitTimeout, m.Key, resourceGroupTag)
|
||||
}
|
||||
if lock != nil {
|
||||
if lock.Op != uint8(kvrpcpb.Op_PessimisticLock) {
|
||||
@ -533,11 +537,13 @@ func (store *MVCCStore) CheckSecondaryLocks(reqCtx *requestCtx, keys [][]byte, s
|
||||
func (store *MVCCStore) normalizeWaitTime(lockWaitTime int64) time.Duration {
|
||||
if lockWaitTime > store.conf.PessimisticTxn.WaitForLockTimeout {
|
||||
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
|
||||
} else if lockWaitTime == 0 {
|
||||
lockWaitTime = store.conf.PessimisticTxn.WaitForLockTimeout
|
||||
}
|
||||
return time.Duration(lockWaitTime) * time.Millisecond
|
||||
}
|
||||
|
||||
func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64) (*lockwaiter.Waiter, error) {
|
||||
func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isFirstLock bool, lockWaitTime int64, key []byte, resourceGroupTag []byte) (*lockwaiter.Waiter, error) {
|
||||
if locked, ok := err.(*ErrLocked); ok {
|
||||
if lockWaitTime != lockwaiter.LockNoWait {
|
||||
keyHash := farm.Fingerprint64(locked.Key)
|
||||
@ -546,7 +552,7 @@ func (store *MVCCStore) handleCheckPessimisticErr(startTS uint64, err error, isF
|
||||
log.S().Debugf("%d blocked by %d on key %d", startTS, lock.StartTS, keyHash)
|
||||
waiter := store.lockWaiterManager.NewWaiter(startTS, lock.StartTS, keyHash, waitTimeDuration)
|
||||
if !isFirstLock {
|
||||
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash)
|
||||
store.DeadlockDetectCli.Detect(startTS, lock.StartTS, keyHash, key, resourceGroupTag)
|
||||
}
|
||||
return waiter, err
|
||||
}
|
||||
|
||||
@ -217,6 +217,7 @@ func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.Pessimist
|
||||
LockKey: errLocked.Key,
|
||||
LockTS: errLocked.Lock.StartTS,
|
||||
DeadlockKeyHash: result.DeadlockResp.DeadlockKeyHash,
|
||||
WaitChain: result.DeadlockResp.WaitChain,
|
||||
}
|
||||
resp.Errors, resp.RegionError = convertToPBErrors(deadlockErr)
|
||||
return resp, nil
|
||||
@ -845,11 +846,6 @@ func (svr *Server) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpc
|
||||
return &kvrpcpb.ReadIndexResponse{}, nil
|
||||
}
|
||||
|
||||
// GetLockWaitInfo implements implements the tikvpb.TikvServer interface.
|
||||
func (svr *Server) GetLockWaitInfo(ctx context.Context, _ *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
|
||||
return &kvrpcpb.GetLockWaitInfoResponse{}, nil
|
||||
}
|
||||
|
||||
// transaction debugger commands.
|
||||
|
||||
// MvccGetByKey implements implements the tikvpb.TikvServer interface.
|
||||
@ -976,6 +972,11 @@ func (svr *Server) GetStoreSafeTS(context.Context, *kvrpcpb.StoreSafeTSRequest)
|
||||
return &kvrpcpb.StoreSafeTSResponse{}, nil
|
||||
}
|
||||
|
||||
// GetLockWaitInfo implements the tikvpb.TikvServer interface.
|
||||
func (svr *Server) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func convertToKeyError(err error) *kvrpcpb.KeyError {
|
||||
if err == nil {
|
||||
return nil
|
||||
@ -1011,6 +1012,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
|
||||
LockKey: x.LockKey,
|
||||
LockTs: x.LockTS,
|
||||
DeadlockKeyHash: x.DeadlockKeyHash,
|
||||
WaitChain: x.WaitChain,
|
||||
},
|
||||
}
|
||||
case *ErrCommitExpire:
|
||||
|
||||
@ -27,4 +27,5 @@ type LockCtx struct {
|
||||
ValuesLock sync.Mutex
|
||||
LockExpired *uint32
|
||||
Stats *util.LockKeysDetails
|
||||
ResourceGroupTag []byte
|
||||
}
|
||||
|
||||
@ -101,7 +101,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
|
||||
WaitTimeout: action.LockWaitTime,
|
||||
ReturnValues: action.ReturnValues,
|
||||
MinCommitTs: c.forUpdateTS + 1,
|
||||
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
|
||||
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag})
|
||||
lockWaitStartTime := action.WaitStartTime
|
||||
for {
|
||||
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
|
||||
|
||||
@ -463,10 +463,6 @@ func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRe
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error {
|
||||
return errors.New("unreachable")
|
||||
}
|
||||
@ -495,6 +491,10 @@ func (s *mockTikvGrpcServer) CoprocessorV2(context.Context, *coprocessor_v2.RawC
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *mockTikvGrpcServer) GetLockWaitInfo(context.Context, *kvrpcpb.GetLockWaitInfoRequest) (*kvrpcpb.GetLockWaitInfoResponse, error) {
|
||||
return nil, errors.New("unreachable")
|
||||
}
|
||||
|
||||
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) {
|
||||
// prepare a mock tikv grpc server
|
||||
addr := "localhost:56341"
|
||||
|
||||
@ -19,13 +19,17 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
deadlockPB "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
tikverr "github.com/pingcap/tidb/store/tikv/error"
|
||||
"github.com/pingcap/tidb/store/tikv/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/oracle"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
)
|
||||
@ -640,3 +644,131 @@ func (s *testLockSuite) TestBatchResolveTxnFallenBackFromAsyncCommit(c *C) {
|
||||
_, err = t3.Get(context.Background(), []byte("fb2"))
|
||||
c.Assert(tikverr.IsErrNotFound(err), IsTrue)
|
||||
}
|
||||
|
||||
func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) {
|
||||
// Utilities to make the test logic clear and simple.
|
||||
type txnWrapper struct {
|
||||
tikv.TxnProbe
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
makeLockCtx := func(txn *txnWrapper, resourceGroupTag string) *kv.LockCtx {
|
||||
return &kv.LockCtx{
|
||||
ForUpdateTS: txn.StartTS(),
|
||||
WaitStartTime: time.Now(),
|
||||
LockWaitTime: 1000,
|
||||
ResourceGroupTag: []byte(resourceGroupTag),
|
||||
}
|
||||
}
|
||||
|
||||
// Prepares several transactions and each locks a key.
|
||||
prepareTxns := func(num int) []*txnWrapper {
|
||||
res := make([]*txnWrapper, 0, num)
|
||||
for i := 0; i < num; i++ {
|
||||
txnProbe, err := s.store.Begin()
|
||||
c.Assert(err, IsNil)
|
||||
txn := &txnWrapper{TxnProbe: txnProbe}
|
||||
txn.SetPessimistic(true)
|
||||
tag := fmt.Sprintf("tag-init%v", i)
|
||||
key := []byte{'k', byte(i)}
|
||||
err = txn.LockKeys(context.Background(), makeLockCtx(txn, tag), key)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
res = append(res, txn)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// Let the i-th trnasaction lock the key that has been locked by j-th transaction
|
||||
tryLock := func(txns []*txnWrapper, i int, j int) error {
|
||||
c.Logf("txn %v try locking %v", i, j)
|
||||
txn := txns[i]
|
||||
tag := fmt.Sprintf("tag-%v-%v", i, j)
|
||||
key := []byte{'k', byte(j)}
|
||||
return txn.LockKeys(context.Background(), makeLockCtx(txn, tag), key)
|
||||
}
|
||||
|
||||
// Asserts the i-th transaction waits for the j-th transaction.
|
||||
makeWaitFor := func(txns []*txnWrapper, i int, j int) {
|
||||
txns[i].wg.Add(1)
|
||||
go func() {
|
||||
defer txns[i].wg.Done()
|
||||
err := tryLock(txns, i, j)
|
||||
// After the lock being waited for is released, the transaction returns a WriteConflict error
|
||||
// unconditionally, which is by design.
|
||||
c.Assert(err, NotNil)
|
||||
c.Logf("txn %v wait for %v finished, err: %s", i, j, err.Error())
|
||||
_, ok := errors.Cause(err).(*tikverr.ErrWriteConflict)
|
||||
c.Assert(ok, IsTrue)
|
||||
}()
|
||||
}
|
||||
|
||||
waitAndRollback := func(txns []*txnWrapper, i int) {
|
||||
// It's expected that each transaction should be rolled back after its blocker, so that `Rollback` will not
|
||||
// run when there's concurrent `LockKeys` running.
|
||||
// If it's blocked on the `Wait` forever, it means the transaction's blocker is not rolled back.
|
||||
c.Logf("rollback txn %v", i)
|
||||
txns[i].wg.Wait()
|
||||
err := txns[i].Rollback()
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
// Check the given WaitForEntry is caused by txn[i] waiting for txn[j].
|
||||
checkWaitChainEntry := func(txns []*txnWrapper, entry *deadlockPB.WaitForEntry, i, j int) {
|
||||
c.Assert(entry.Txn, Equals, txns[i].StartTS())
|
||||
c.Assert(entry.WaitForTxn, Equals, txns[j].StartTS())
|
||||
c.Assert(entry.Key, BytesEquals, []byte{'k', byte(j)})
|
||||
c.Assert(string(entry.ResourceGroupTag), Equals, fmt.Sprintf("tag-%v-%v", i, j))
|
||||
}
|
||||
|
||||
c.Log("test case 1: 1->0->1")
|
||||
|
||||
txns := prepareTxns(2)
|
||||
|
||||
makeWaitFor(txns, 0, 1)
|
||||
// Sleep for a while to make sure it has been blocked.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
// txn2 tries locking k1 and encounters deadlock error.
|
||||
err := tryLock(txns, 1, 0)
|
||||
c.Assert(err, NotNil)
|
||||
dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock)
|
||||
c.Assert(ok, IsTrue)
|
||||
|
||||
waitChain := dl.GetWaitChain()
|
||||
c.Assert(len(waitChain), Equals, 2)
|
||||
checkWaitChainEntry(txns, waitChain[0], 0, 1)
|
||||
checkWaitChainEntry(txns, waitChain[1], 1, 0)
|
||||
|
||||
// Each transaction should be rolled back after its blocker being rolled back
|
||||
waitAndRollback(txns, 1)
|
||||
waitAndRollback(txns, 0)
|
||||
|
||||
c.Log("test case 2: 3->2->0->1->3")
|
||||
txns = prepareTxns(4)
|
||||
|
||||
makeWaitFor(txns, 0, 1)
|
||||
makeWaitFor(txns, 2, 0)
|
||||
makeWaitFor(txns, 1, 3)
|
||||
// Sleep for a while to make sure it has been blocked.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
err = tryLock(txns, 3, 2)
|
||||
c.Assert(err, NotNil)
|
||||
dl, ok = errors.Cause(err).(*tikverr.ErrDeadlock)
|
||||
c.Assert(ok, IsTrue)
|
||||
|
||||
waitChain = dl.GetWaitChain()
|
||||
c.Assert(len(waitChain), Equals, 4)
|
||||
c.Logf("wait chain: \n** %v\n**%v\n**%v\n**%v\n", waitChain[0], waitChain[1], waitChain[2], waitChain[3])
|
||||
checkWaitChainEntry(txns, waitChain[0], 2, 0)
|
||||
checkWaitChainEntry(txns, waitChain[1], 0, 1)
|
||||
checkWaitChainEntry(txns, waitChain[2], 1, 3)
|
||||
checkWaitChainEntry(txns, waitChain[3], 3, 2)
|
||||
|
||||
// Each transaction should be rolled back after its blocker being rolled back
|
||||
waitAndRollback(txns, 3)
|
||||
waitAndRollback(txns, 1)
|
||||
waitAndRollback(txns, 0)
|
||||
waitAndRollback(txns, 2)
|
||||
}
|
||||
|
||||
@ -613,15 +613,18 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput
|
||||
// If there is only 1 key and lock fails, no need to do pessimistic rollback.
|
||||
if len(keys) > 1 || keyMayBeLocked {
|
||||
wg := txn.asyncPessimisticRollback(ctx, keys)
|
||||
if dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok && hashInKeys(dl.DeadlockKeyHash, keys) {
|
||||
dl.IsRetryable = true
|
||||
// Wait for the pessimistic rollback to finish before we retry the statement.
|
||||
wg.Wait()
|
||||
// Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock.
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
failpoint.Inject("SingleStmtDeadLockRetrySleep", func() {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
})
|
||||
if dl, ok := errors.Cause(err).(*tikverr.ErrDeadlock); ok {
|
||||
logutil.Logger(ctx).Debug("deadlock error received", zap.Uint64("startTS", txn.startTS), zap.Stringer("deadlockInfo", dl))
|
||||
if hashInKeys(dl.DeadlockKeyHash, keys) {
|
||||
dl.IsRetryable = true
|
||||
// Wait for the pessimistic rollback to finish before we retry the statement.
|
||||
wg.Wait()
|
||||
// Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock.
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
failpoint.Inject("SingleStmtDeadLockRetrySleep", func() {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
if assignedPrimaryKey {
|
||||
|
||||
85
util/resourcegrouptag/resource_group_tag.go
Normal file
85
util/resourcegrouptag/resource_group_tag.go
Normal file
@ -0,0 +1,85 @@
|
||||
package resourcegrouptag
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
resourceGroupTagPrefixSQLDigest = byte(1)
|
||||
)
|
||||
|
||||
// EncodeResourceGroupTag encodes sqlDigest into resource group tag.
|
||||
// A resource group tag can be carried in the Context field of TiKV requests, which is a byte array, and sent to TiKV as
|
||||
// diagnostic information. Currently it contains only the SQL Digest, and the codec method is naive but extendable.
|
||||
// This function doesn't return error. When there's some error, which can only be caused by unexpected format of the
|
||||
// arguments, it simply returns an empty result.
|
||||
// The format:
|
||||
// +-----------+-----------------------+----------------------------+---------------+----------------+----
|
||||
// | version=1 | field1 prefix (1byte) | field1 content (var bytes) | field2 prefix | field2 content | ...
|
||||
// +-----------+-----------------------+----------------------------+---------------+----------------+----
|
||||
// The `version` section marks the codec version, which makes it easier for changing the format in the future.
|
||||
// Each field starts with a byte to mark what field it is, and the length of the content depends on the field's
|
||||
// definition.
|
||||
// Currently there's only one field (SQL Digest), and its content starts with a byte `B` describing it's length, and
|
||||
// then follows by exactly `B` bytes.
|
||||
func EncodeResourceGroupTag(sqlDigest string) []byte {
|
||||
if len(sqlDigest) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(sqlDigest) >= 512 {
|
||||
logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: length too long", zap.String("sqlDigest", sqlDigest))
|
||||
return nil
|
||||
}
|
||||
|
||||
res := make([]byte, 3+len(sqlDigest)/2)
|
||||
|
||||
const encodingVersion = 1
|
||||
res[0] = encodingVersion
|
||||
|
||||
res[1] = resourceGroupTagPrefixSQLDigest
|
||||
// The SQL Digest is expected to be a hex string. Convert it back to bytes to save half of the memory.
|
||||
res[2] = byte(len(sqlDigest) / 2)
|
||||
_, err := hex.Decode(res[3:], []byte(sqlDigest))
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("failed to encode sql digest to resource group tag: invalid hex string", zap.String("sqlDigest", sqlDigest))
|
||||
return nil
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
// DecodeResourceGroupTag decodes a resource group tag into various information contained in it. Currently it contains
|
||||
// only the SQL Digest.
|
||||
func DecodeResourceGroupTag(data []byte) (sqlDigest string, err error) {
|
||||
if len(data) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
encodingVersion := data[0]
|
||||
if encodingVersion != 1 {
|
||||
return "", errors.Errorf("unsupported resource group tag version %v", data[0])
|
||||
}
|
||||
rem := data[1:]
|
||||
|
||||
for len(rem) > 0 {
|
||||
switch rem[0] {
|
||||
case resourceGroupTagPrefixSQLDigest:
|
||||
// There must be one more byte at rem[1] to represent the content's length, and the remaining bytes should
|
||||
// not be shorter than the length specified by rem[1].
|
||||
if len(rem) < 2 || len(rem)-2 < int(rem[1]) {
|
||||
return "", errors.Errorf("cannot parse resource group tag: field length mismatch, tag: %v", hex.EncodeToString(data))
|
||||
}
|
||||
fieldLen := int(rem[1])
|
||||
sqlDigest = hex.EncodeToString(rem[2 : 2+fieldLen])
|
||||
rem = rem[2+fieldLen:]
|
||||
default:
|
||||
return "", errors.Errorf("resource group tag field not recognized, prefix: %v, tag: %v", rem[0], hex.EncodeToString(data))
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
111
util/resourcegrouptag/resource_group_tag_test.go
Normal file
111
util/resourcegrouptag/resource_group_tag_test.go
Normal file
@ -0,0 +1,111 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package resourcegrouptag
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
)
|
||||
|
||||
type testUtilsSuite struct{}
|
||||
|
||||
var _ = Suite(&testUtilsSuite{})
|
||||
|
||||
func TestT(t *testing.T) {
|
||||
TestingT(t)
|
||||
}
|
||||
|
||||
func (s *testUtilsSuite) TestResourceGroupTagEncoding(c *C) {
|
||||
sqlDigest := ""
|
||||
tag := EncodeResourceGroupTag(sqlDigest)
|
||||
c.Assert(len(tag), Equals, 0)
|
||||
decodedSQLDigest, err := DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(decodedSQLDigest), Equals, 0)
|
||||
|
||||
sqlDigest = "aa"
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
// version(1) + prefix(1) + length(1) + content(2hex -> 1byte)
|
||||
c.Assert(len(tag), Equals, 4)
|
||||
decodedSQLDigest, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(decodedSQLDigest, Equals, sqlDigest)
|
||||
|
||||
sqlDigest = genRandHex(64)
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
decodedSQLDigest, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(decodedSQLDigest, Equals, sqlDigest)
|
||||
|
||||
sqlDigest = genRandHex(510)
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
decodedSQLDigest, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(decodedSQLDigest, Equals, sqlDigest)
|
||||
|
||||
// The max supported length is 255 bytes (510 hex digits).
|
||||
sqlDigest = genRandHex(512)
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
c.Assert(len(tag), Equals, 0)
|
||||
|
||||
// A hex string can't have odd length.
|
||||
sqlDigest = genRandHex(15)
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
c.Assert(len(tag), Equals, 0)
|
||||
|
||||
// Non-hexadecimal character is invalid
|
||||
sqlDigest = "aabbccddgg"
|
||||
tag = EncodeResourceGroupTag(sqlDigest)
|
||||
c.Assert(len(tag), Equals, 0)
|
||||
|
||||
// A tag should start with a supported version
|
||||
tag = []byte("\x00")
|
||||
_, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
// The fields should have format like `[prefix, length, content...]`, otherwise decoding it should returns error.
|
||||
tag = []byte("\x01\x01")
|
||||
_, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
tag = []byte("\x01\x01\x02")
|
||||
_, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
tag = []byte("\x01\x01\x02AB")
|
||||
decodedSQLDigest, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(decodedSQLDigest, Equals, "4142")
|
||||
|
||||
tag = []byte("\x01\x01\x00")
|
||||
decodedSQLDigest, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(len(decodedSQLDigest), Equals, 0)
|
||||
|
||||
// Unsupported field
|
||||
tag = []byte("\x01\x99")
|
||||
_, err = DecodeResourceGroupTag(tag)
|
||||
c.Assert(err, NotNil)
|
||||
}
|
||||
|
||||
func genRandHex(length int) string {
|
||||
const chars = "0123456789abcdef"
|
||||
res := make([]byte, length)
|
||||
for i := 0; i < length; i++ {
|
||||
res[i] = chars[rand.Intn(len(chars))]
|
||||
}
|
||||
return string(res)
|
||||
}
|
||||
Reference in New Issue
Block a user