store/tikv: support single statement rollback for pessimistic transaction (#10654)
This commit is contained in:
@ -40,6 +40,7 @@ import (
|
||||
"github.com/pingcap/tidb/plugin"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/store/tikv"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
@ -427,40 +428,51 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
|
||||
if err == nil {
|
||||
return nil, nil
|
||||
}
|
||||
if !terror.ErrorEqual(kv.ErrWriteConflict, err) {
|
||||
txnCtx := a.Ctx.GetSessionVars().TxnCtx
|
||||
var newForUpdateTS uint64
|
||||
if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok {
|
||||
if !deadlock.IsRetryable {
|
||||
return nil, ErrDeadlock
|
||||
}
|
||||
logutil.Logger(ctx).Info("single statement deadlock, retry statement",
|
||||
zap.Uint64("txn", txnCtx.StartTS),
|
||||
zap.Uint64("lockTS", deadlock.LockTs),
|
||||
zap.Binary("lockKey", deadlock.LockKey),
|
||||
zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
|
||||
} else if terror.ErrorEqual(kv.ErrWriteConflict, err) {
|
||||
conflictCommitTS := extractConflictCommitTS(err.Error())
|
||||
if conflictCommitTS == 0 {
|
||||
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error")
|
||||
}
|
||||
forUpdateTS := txnCtx.GetForUpdateTS()
|
||||
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
|
||||
zap.Uint64("txn", txnCtx.StartTS),
|
||||
zap.Uint64("forUpdateTS", forUpdateTS),
|
||||
zap.Uint64("conflictCommitTS", conflictCommitTS))
|
||||
if conflictCommitTS > forUpdateTS {
|
||||
newForUpdateTS = conflictCommitTS
|
||||
}
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
|
||||
return nil, errors.New("pessimistic lock retry limit reached")
|
||||
}
|
||||
a.retryCount++
|
||||
conflictCommitTS := extractConflictCommitTS(err.Error())
|
||||
if conflictCommitTS == 0 {
|
||||
logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error")
|
||||
}
|
||||
sctx := a.Ctx
|
||||
txnCtx := sctx.GetSessionVars().TxnCtx
|
||||
forUpdateTS := txnCtx.GetForUpdateTS()
|
||||
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
|
||||
zap.Uint64("txn", txnCtx.StartTS),
|
||||
zap.Uint64("forUpdateTS", forUpdateTS),
|
||||
zap.Uint64("conflictCommitTS", conflictCommitTS))
|
||||
if conflictCommitTS > txnCtx.GetForUpdateTS() {
|
||||
txnCtx.SetForUpdateTS(conflictCommitTS)
|
||||
} else {
|
||||
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
if newForUpdateTS == 0 {
|
||||
newForUpdateTS, err = a.Ctx.GetStore().GetOracle().GetTimestamp(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
txnCtx.SetForUpdateTS(ts)
|
||||
}
|
||||
txnCtx.SetForUpdateTS(newForUpdateTS)
|
||||
e, err := a.buildExecutor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Rollback the statement change before retry it.
|
||||
sctx.StmtRollback()
|
||||
sctx.GetSessionVars().StmtCtx.ResetForRetry()
|
||||
a.Ctx.StmtRollback()
|
||||
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
|
||||
|
||||
if err = e.Open(ctx); err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -51,6 +51,7 @@ var (
|
||||
ErrBadDB = terror.ClassExecutor.New(mysql.ErrBadDB, mysql.MySQLErrName[mysql.ErrBadDB])
|
||||
ErrWrongObject = terror.ClassExecutor.New(mysql.ErrWrongObject, mysql.MySQLErrName[mysql.ErrWrongObject])
|
||||
ErrRoleNotGranted = terror.ClassPrivilege.New(mysql.ErrRoleNotGranted, mysql.MySQLErrName[mysql.ErrRoleNotGranted])
|
||||
ErrDeadlock = terror.ClassExecutor.New(mysql.ErrLockDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock])
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -67,6 +68,7 @@ func init() {
|
||||
mysql.ErrTableaccessDenied: mysql.ErrTableaccessDenied,
|
||||
mysql.ErrBadDB: mysql.ErrBadDB,
|
||||
mysql.ErrWrongObject: mysql.ErrWrongObject,
|
||||
mysql.ErrLockDeadlock: mysql.ErrLockDeadlock,
|
||||
}
|
||||
terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes
|
||||
}
|
||||
|
||||
@ -637,6 +637,7 @@ type SelectLockExec struct {
|
||||
baseExecutor
|
||||
|
||||
Lock ast.SelectLockType
|
||||
keys []kv.Key
|
||||
}
|
||||
|
||||
// Open implements the Executor Open interface.
|
||||
@ -670,29 +671,24 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.RecordBatch) error
|
||||
if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate {
|
||||
return nil
|
||||
}
|
||||
if req.NumRows() != 0 {
|
||||
iter := chunk.NewIterator4Chunk(req.Chunk)
|
||||
for id, cols := range e.Schema().TblID2Handle {
|
||||
for _, col := range cols {
|
||||
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
|
||||
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Lock keys only once when finished fetching all results.
|
||||
txn, err := e.ctx.Txn(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
keys := make([]kv.Key, 0, req.NumRows())
|
||||
iter := chunk.NewIterator4Chunk(req.Chunk)
|
||||
forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
|
||||
for id, cols := range e.Schema().TblID2Handle {
|
||||
for _, col := range cols {
|
||||
keys = keys[:0]
|
||||
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
|
||||
keys = append(keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
continue
|
||||
}
|
||||
err = txn.LockKeys(ctx, forUpdateTS, keys...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return txn.LockKeys(ctx, forUpdateTS, e.keys...)
|
||||
}
|
||||
|
||||
// LimitExec represents limit executor
|
||||
|
||||
@ -27,6 +27,8 @@ import (
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/store/mockstore"
|
||||
"github.com/pingcap/tidb/store/mockstore/mocktikv"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testleak"
|
||||
)
|
||||
@ -194,3 +196,36 @@ func (s *testPessimisticSuite) TestDeadlock(c *C) {
|
||||
c.Assert(int(e.Code()), Equals, mysql.ErrLockDeadlock)
|
||||
syncCh <- struct{}{}
|
||||
}
|
||||
|
||||
func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) {
|
||||
tk := testkit.NewTestKitWithInit(c, s.store)
|
||||
tk2 := testkit.NewTestKitWithInit(c, s.store)
|
||||
|
||||
tk.MustExec("drop table if exists pessimistic")
|
||||
tk.MustExec("create table single_statement (id int primary key, v int)")
|
||||
tk.MustExec("insert into single_statement values (1, 1), (2, 1), (3, 1), (4, 1)")
|
||||
tblID := tk.GetTableID("single_statement")
|
||||
s.cluster.SplitTable(s.mvccStore, tblID, 2)
|
||||
region1Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 1))
|
||||
region1, _ := s.cluster.GetRegionByKey(region1Key)
|
||||
region1ID := region1.Id
|
||||
region2Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 3))
|
||||
region2, _ := s.cluster.GetRegionByKey(region2Key)
|
||||
region2ID := region2.Id
|
||||
|
||||
syncCh := make(chan bool)
|
||||
go func() {
|
||||
tk2.MustExec("begin pessimistic")
|
||||
<-syncCh
|
||||
s.cluster.ScheduleDelay(tk2.Se.GetSessionVars().TxnCtx.StartTS, region2ID, time.Millisecond*3)
|
||||
tk2.MustExec("update single_statement set v = v + 1")
|
||||
tk2.MustExec("commit")
|
||||
<-syncCh
|
||||
}()
|
||||
tk.MustExec("begin pessimistic")
|
||||
syncCh <- true
|
||||
s.cluster.ScheduleDelay(tk.Se.GetSessionVars().TxnCtx.StartTS, region1ID, time.Millisecond*3)
|
||||
tk.MustExec("update single_statement set v = v + 1")
|
||||
tk.MustExec("commit")
|
||||
syncCh <- true
|
||||
}
|
||||
|
||||
@ -163,9 +163,8 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV
|
||||
if !sessVars.InTxn() {
|
||||
logutil.Logger(context.Background()).Info("rollbackTxn for ddl/autocommit error.")
|
||||
se.RollbackTxn(ctx)
|
||||
} else if se.txn.Valid() && se.txn.IsPessimistic() && strings.Contains(meetsErr.Error(), "deadlock") {
|
||||
} else if se.txn.Valid() && se.txn.IsPessimistic() && executor.ErrDeadlock.Equal(meetsErr) {
|
||||
logutil.Logger(context.Background()).Info("rollbackTxn for deadlock error", zap.Uint64("txn", se.txn.StartTS()))
|
||||
meetsErr = errDeadlock
|
||||
se.RollbackTxn(ctx)
|
||||
}
|
||||
return meetsErr
|
||||
@ -328,18 +327,15 @@ func IsQuery(sql string) bool {
|
||||
var (
|
||||
errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry,
|
||||
mysql.MySQLErrName[mysql.ErrForUpdateCantRetry])
|
||||
errDeadlock = terror.ClassSession.New(codeDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock])
|
||||
)
|
||||
|
||||
const (
|
||||
codeForUpdateCantRetry terror.ErrCode = mysql.ErrForUpdateCantRetry
|
||||
codeDeadlock terror.ErrCode = mysql.ErrLockDeadlock
|
||||
)
|
||||
|
||||
func init() {
|
||||
sessionMySQLErrCodes := map[terror.ErrCode]uint16{
|
||||
codeForUpdateCantRetry: mysql.ErrForUpdateCantRetry,
|
||||
codeDeadlock: mysql.ErrLockDeadlock,
|
||||
}
|
||||
terror.ErrClassToMySQLCodes[terror.ClassSession] = sessionMySQLErrCodes
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
@ -41,14 +42,24 @@ type Cluster struct {
|
||||
id uint64
|
||||
stores map[uint64]*Store
|
||||
regions map[uint64]*Region
|
||||
|
||||
// delayEvents is used to control the execution sequence of rpc requests for test.
|
||||
delayEvents map[delayKey]time.Duration
|
||||
delayMu sync.Mutex
|
||||
}
|
||||
|
||||
type delayKey struct {
|
||||
startTS uint64
|
||||
regionID uint64
|
||||
}
|
||||
|
||||
// NewCluster creates an empty cluster. It needs to be bootstrapped before
|
||||
// providing service.
|
||||
func NewCluster() *Cluster {
|
||||
return &Cluster{
|
||||
stores: make(map[uint64]*Store),
|
||||
regions: make(map[uint64]*Region),
|
||||
stores: make(map[uint64]*Store),
|
||||
regions: make(map[uint64]*Region),
|
||||
delayEvents: make(map[delayKey]time.Duration),
|
||||
}
|
||||
}
|
||||
|
||||
@ -347,6 +358,26 @@ func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) {
|
||||
c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count)
|
||||
}
|
||||
|
||||
// ScheduleDelay schedules a delay event for a transaction on a region.
|
||||
func (c *Cluster) ScheduleDelay(startTS, regionID uint64, dur time.Duration) {
|
||||
c.delayMu.Lock()
|
||||
c.delayEvents[delayKey{startTS: startTS, regionID: regionID}] = dur
|
||||
c.delayMu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Cluster) handleDelay(startTS, regionID uint64) {
|
||||
key := delayKey{startTS: startTS, regionID: regionID}
|
||||
c.delayMu.Lock()
|
||||
dur, ok := c.delayEvents[key]
|
||||
if ok {
|
||||
delete(c.delayEvents, key)
|
||||
}
|
||||
c.delayMu.Unlock()
|
||||
if ok {
|
||||
time.Sleep(dur)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
@ -61,7 +61,7 @@ func (e ErrAlreadyCommitted) Error() string {
|
||||
return "txn already committed"
|
||||
}
|
||||
|
||||
// ErrConflict is turned when the commitTS of key in the DB is greater than startTS.
|
||||
// ErrConflict is returned when the commitTS of key in the DB is greater than startTS.
|
||||
type ErrConflict struct {
|
||||
StartTS uint64
|
||||
ConflictTS uint64
|
||||
@ -71,3 +71,14 @@ type ErrConflict struct {
|
||||
func (e *ErrConflict) Error() string {
|
||||
return "write conflict"
|
||||
}
|
||||
|
||||
// ErrDeadlock is returned when deadlock error is detected.
|
||||
type ErrDeadlock struct {
|
||||
LockTS uint64
|
||||
LockKey []byte
|
||||
DealockKeyHash uint64
|
||||
}
|
||||
|
||||
func (e *ErrDeadlock) Error() string {
|
||||
return "deadlock"
|
||||
}
|
||||
|
||||
@ -42,11 +42,12 @@ type mvccValue struct {
|
||||
}
|
||||
|
||||
type mvccLock struct {
|
||||
startTS uint64
|
||||
primary []byte
|
||||
value []byte
|
||||
op kvrpcpb.Op
|
||||
ttl uint64
|
||||
startTS uint64
|
||||
primary []byte
|
||||
value []byte
|
||||
op kvrpcpb.Op
|
||||
ttl uint64
|
||||
forUpdateTS uint64
|
||||
}
|
||||
|
||||
type mvccEntry struct {
|
||||
@ -66,6 +67,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) {
|
||||
mh.WriteSlice(&buf, l.value)
|
||||
mh.WriteNumber(&buf, l.op)
|
||||
mh.WriteNumber(&buf, l.ttl)
|
||||
mh.WriteNumber(&buf, l.forUpdateTS)
|
||||
return buf.Bytes(), errors.Trace(mh.err)
|
||||
}
|
||||
|
||||
@ -78,6 +80,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error {
|
||||
mh.ReadSlice(buf, &l.value)
|
||||
mh.ReadNumber(buf, &l.op)
|
||||
mh.ReadNumber(buf, &l.ttl)
|
||||
mh.ReadNumber(buf, &l.forUpdateTS)
|
||||
return errors.Trace(mh.err)
|
||||
}
|
||||
|
||||
@ -429,7 +432,8 @@ type MVCCStore interface {
|
||||
ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
|
||||
BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair
|
||||
PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error
|
||||
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error
|
||||
PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error
|
||||
Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS, ttl uint64) []error
|
||||
Commit(keys [][]byte, startTS, commitTS uint64) error
|
||||
Rollback(keys [][]byte, startTS uint64) error
|
||||
Cleanup(key []byte, startTS uint64) error
|
||||
|
||||
@ -520,7 +520,7 @@ func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary
|
||||
return errs
|
||||
}
|
||||
if err := mvcc.db.Write(batch, nil); err != nil {
|
||||
return nil
|
||||
return []error{err}
|
||||
}
|
||||
|
||||
return errs
|
||||
@ -544,7 +544,11 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||
if dec.lock.startTS != startTS {
|
||||
errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key))
|
||||
if errDeadlock != nil {
|
||||
return errDeadlock
|
||||
return &ErrDeadlock{
|
||||
LockKey: mutation.Key,
|
||||
LockTS: dec.lock.startTS,
|
||||
DealockKeyHash: errDeadlock.KeyHash,
|
||||
}
|
||||
}
|
||||
return dec.lock.lockErr(mutation.Key)
|
||||
}
|
||||
@ -555,10 +559,11 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||
}
|
||||
|
||||
lock := mvccLock{
|
||||
startTS: startTS,
|
||||
primary: primary,
|
||||
op: kvrpcpb.Op_PessimisticLock,
|
||||
ttl: ttl,
|
||||
startTS: startTS,
|
||||
primary: primary,
|
||||
op: kvrpcpb.Op_PessimisticLock,
|
||||
ttl: ttl,
|
||||
forUpdateTS: forUpdateTS,
|
||||
}
|
||||
writeKey := mvccEncode(mutation.Key, lockVer)
|
||||
writeValue, err := lock.MarshalBinary()
|
||||
@ -570,6 +575,53 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation
|
||||
return nil
|
||||
}
|
||||
|
||||
// PessimisticRollback implements the MVCCStore interface.
|
||||
func (mvcc *MVCCLevelDB) PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error {
|
||||
mvcc.mu.Lock()
|
||||
defer mvcc.mu.Unlock()
|
||||
|
||||
anyError := false
|
||||
batch := &leveldb.Batch{}
|
||||
errs := make([]error, 0, len(keys))
|
||||
for _, key := range keys {
|
||||
err := pessimisticRollbackKey(mvcc.db, batch, key, startTS, forUpdateTS)
|
||||
errs = append(errs, err)
|
||||
if err != nil {
|
||||
anyError = true
|
||||
}
|
||||
}
|
||||
if anyError {
|
||||
return errs
|
||||
}
|
||||
if err := mvcc.db.Write(batch, nil); err != nil {
|
||||
return []error{err}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func pessimisticRollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, forUpdateTS uint64) error {
|
||||
startKey := mvccEncode(key, lockVer)
|
||||
iter := newIterator(db, &util.Range{
|
||||
Start: startKey,
|
||||
})
|
||||
defer iter.Release()
|
||||
|
||||
dec := lockDecoder{
|
||||
expectKey: key,
|
||||
}
|
||||
ok, err := dec.Decode(iter)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if ok {
|
||||
lock := dec.lock
|
||||
if lock.op == kvrpcpb.Op_PessimisticLock && lock.startTS == startTS && lock.forUpdateTS <= forUpdateTS {
|
||||
batch.Delete(startKey)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prewrite implements the MVCCStore interface.
|
||||
func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error {
|
||||
mvcc.mu.Lock()
|
||||
@ -607,7 +659,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte,
|
||||
return errs
|
||||
}
|
||||
if err := mvcc.db.Write(batch, nil); err != nil {
|
||||
return nil
|
||||
return []error{err}
|
||||
}
|
||||
|
||||
return errs
|
||||
|
||||
@ -76,6 +76,15 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
|
||||
},
|
||||
}
|
||||
}
|
||||
if dead, ok := errors.Cause(err).(*ErrDeadlock); ok {
|
||||
return &kvrpcpb.KeyError{
|
||||
Deadlock: &kvrpcpb.Deadlock{
|
||||
LockTs: dead.LockTS,
|
||||
LockKey: dead.LockKey,
|
||||
DeadlockKeyHash: dead.DealockKeyHash,
|
||||
},
|
||||
}
|
||||
}
|
||||
if retryable, ok := errors.Cause(err).(ErrRetryable); ok {
|
||||
return &kvrpcpb.KeyError{
|
||||
Retryable: retryable.Error(),
|
||||
@ -292,15 +301,42 @@ func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Pre
|
||||
func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest) *kvrpcpb.PessimisticLockResponse {
|
||||
for _, m := range req.Mutations {
|
||||
if !h.checkKeyInRegion(m.Key) {
|
||||
panic("KvPrewrite: key not in region")
|
||||
panic("KvPessimisticLock: key not in region")
|
||||
}
|
||||
}
|
||||
startTS := req.StartVersion
|
||||
regionID := req.Context.RegionId
|
||||
h.cluster.handleDelay(startTS, regionID)
|
||||
errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl())
|
||||
|
||||
// TODO: remove this when implement sever side wait.
|
||||
h.simulateServerSideWaitLock(errs)
|
||||
return &kvrpcpb.PessimisticLockResponse{
|
||||
Errors: convertToKeyErrors(errs),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *rpcHandler) simulateServerSideWaitLock(errs []error) {
|
||||
for _, err := range errs {
|
||||
if _, ok := err.(*ErrLocked); ok {
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *rpcHandler) handleKvPessimisticRollback(req *kvrpcpb.PessimisticRollbackRequest) *kvrpcpb.PessimisticRollbackResponse {
|
||||
for _, key := range req.Keys {
|
||||
if !h.checkKeyInRegion(key) {
|
||||
panic("KvPessimisticRollback: key not in region")
|
||||
}
|
||||
}
|
||||
errs := h.mvccStore.PessimisticRollback(req.Keys, req.StartVersion, req.ForUpdateTs)
|
||||
return &kvrpcpb.PessimisticRollbackResponse{
|
||||
Errors: convertToKeyErrors(errs),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *rpcHandler) handleKvCommit(req *kvrpcpb.CommitRequest) *kvrpcpb.CommitResponse {
|
||||
for _, k := range req.Keys {
|
||||
if !h.checkKeyInRegion(k) {
|
||||
@ -663,6 +699,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
|
||||
return resp, nil
|
||||
}
|
||||
resp.PessimisticLock = handler.handleKvPessimisticLock(r)
|
||||
case tikvrpc.CmdPessimisticRollback:
|
||||
r := req.PessimisticRollback
|
||||
if err := handler.checkRequest(reqCtx, r.Size()); err != nil {
|
||||
resp.PessimisticRollback = &kvrpcpb.PessimisticRollbackResponse{RegionError: err}
|
||||
return resp, nil
|
||||
}
|
||||
resp.PessimisticRollback = handler.handleKvPessimisticRollback(r)
|
||||
case tikvrpc.CmdCommit:
|
||||
failpoint.Inject("rpcCommitResult", func(val failpoint.Value) {
|
||||
switch val.(string) {
|
||||
|
||||
@ -45,6 +45,7 @@ const (
|
||||
actionCommit
|
||||
actionCleanup
|
||||
actionPessimisticLock
|
||||
actionPessimisticRollback
|
||||
)
|
||||
|
||||
var (
|
||||
@ -67,6 +68,8 @@ func (ca twoPhaseCommitAction) String() string {
|
||||
return "cleanup"
|
||||
case actionPessimisticLock:
|
||||
return "pessimistic_lock"
|
||||
case actionPessimisticRollback:
|
||||
return "pessimistic_rollback"
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
@ -375,6 +378,8 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
|
||||
singleBatchActionFunc = c.cleanupSingleBatch
|
||||
case actionPessimisticLock:
|
||||
singleBatchActionFunc = c.pessimisticLockSingleBatch
|
||||
case actionPessimisticRollback:
|
||||
singleBatchActionFunc = c.pessimisticRollbackSingleBatch
|
||||
}
|
||||
if len(batches) == 1 {
|
||||
e := singleBatchActionFunc(bo, batches[0])
|
||||
@ -618,7 +623,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
|
||||
return errors.Trace(conditionPair.Err())
|
||||
}
|
||||
if deadlock := keyErr.Deadlock; deadlock != nil {
|
||||
return errors.New("deadlock")
|
||||
return &ErrDeadlock{Deadlock: deadlock}
|
||||
}
|
||||
|
||||
// Extract lock from key error
|
||||
@ -628,16 +633,41 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc
|
||||
}
|
||||
locks = append(locks, lock)
|
||||
}
|
||||
msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks)
|
||||
_, err = c.store.lockResolver.ResolveLocks(bo, locks)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if msBeforeExpired > 0 {
|
||||
err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
|
||||
// Because we already waited on tikv, no need to Backoff here.
|
||||
}
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error {
|
||||
req := &tikvrpc.Request{
|
||||
Type: tikvrpc.CmdPessimisticRollback,
|
||||
PessimisticRollback: &pb.PessimisticRollbackRequest{
|
||||
StartVersion: c.startTS,
|
||||
ForUpdateTs: c.forUpdateTS,
|
||||
Keys: batch.keys,
|
||||
},
|
||||
}
|
||||
for {
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = c.pessimisticRollbackKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -811,6 +841,10 @@ func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) er
|
||||
return c.doActionOnKeys(bo, actionPessimisticLock, keys)
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error {
|
||||
return c.doActionOnKeys(bo, actionPessimisticRollback, keys)
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
|
||||
err := c.execute(ctx)
|
||||
if err != nil {
|
||||
|
||||
@ -214,6 +214,8 @@ const (
|
||||
scatterRegionBackoff = 20000
|
||||
waitScatterRegionFinishBackoff = 120000
|
||||
locateRegionMaxBackoff = 20000
|
||||
pessimisticLockMaxBackoff = 10000
|
||||
pessimisticRollbackMaxBackoff = 10000
|
||||
)
|
||||
|
||||
// CommitMaxBackoff is max sleep time of the 'commit' command
|
||||
|
||||
@ -15,6 +15,7 @@ package tikv
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/parser/terror"
|
||||
)
|
||||
@ -39,6 +40,17 @@ var (
|
||||
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
|
||||
)
|
||||
|
||||
// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
|
||||
// It also marks if the deadlock is retryable.
|
||||
type ErrDeadlock struct {
|
||||
*kvrpcpb.Deadlock
|
||||
IsRetryable bool
|
||||
}
|
||||
|
||||
func (d *ErrDeadlock) Error() string {
|
||||
return d.Deadlock.String()
|
||||
}
|
||||
|
||||
func init() {
|
||||
tikvMySQLErrCodes := map[terror.ErrCode]uint16{
|
||||
mysql.ErrTiKVServerTimeout: mysql.ErrTiKVServerTimeout,
|
||||
|
||||
@ -45,6 +45,7 @@ const (
|
||||
CmdGC
|
||||
CmdDeleteRange
|
||||
CmdPessimisticLock
|
||||
CmdPessimisticRollback
|
||||
|
||||
CmdRawGet CmdType = 256 + iota
|
||||
CmdRawBatchGet
|
||||
@ -77,6 +78,8 @@ func (t CmdType) String() string {
|
||||
return "Prewrite"
|
||||
case CmdPessimisticLock:
|
||||
return "PessimisticLock"
|
||||
case CmdPessimisticRollback:
|
||||
return "PessimisticRollback"
|
||||
case CmdCommit:
|
||||
return "Commit"
|
||||
case CmdCleanup:
|
||||
@ -134,7 +137,6 @@ type Request struct {
|
||||
Get *kvrpcpb.GetRequest
|
||||
Scan *kvrpcpb.ScanRequest
|
||||
Prewrite *kvrpcpb.PrewriteRequest
|
||||
PessimisticLock *kvrpcpb.PessimisticLockRequest
|
||||
Commit *kvrpcpb.CommitRequest
|
||||
Cleanup *kvrpcpb.CleanupRequest
|
||||
BatchGet *kvrpcpb.BatchGetRequest
|
||||
@ -157,6 +159,9 @@ type Request struct {
|
||||
MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest
|
||||
SplitRegion *kvrpcpb.SplitRegionRequest
|
||||
|
||||
PessimisticLock *kvrpcpb.PessimisticLockRequest
|
||||
PessimisticRollback *kvrpcpb.PessimisticRollbackRequest
|
||||
|
||||
DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest
|
||||
}
|
||||
|
||||
@ -205,6 +210,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
|
||||
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: req.Cop}}
|
||||
case CmdPessimisticLock:
|
||||
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticLock{PessimisticLock: req.PessimisticLock}}
|
||||
case CmdPessimisticRollback:
|
||||
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -224,7 +231,6 @@ type Response struct {
|
||||
Get *kvrpcpb.GetResponse
|
||||
Scan *kvrpcpb.ScanResponse
|
||||
Prewrite *kvrpcpb.PrewriteResponse
|
||||
PessimisticLock *kvrpcpb.PessimisticLockResponse
|
||||
Commit *kvrpcpb.CommitResponse
|
||||
Cleanup *kvrpcpb.CleanupResponse
|
||||
BatchGet *kvrpcpb.BatchGetResponse
|
||||
@ -248,6 +254,9 @@ type Response struct {
|
||||
MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse
|
||||
SplitRegion *kvrpcpb.SplitRegionResponse
|
||||
|
||||
PessimisticLock *kvrpcpb.PessimisticLockResponse
|
||||
PessimisticRollback *kvrpcpb.PessimisticRollbackResponse
|
||||
|
||||
DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse
|
||||
}
|
||||
|
||||
@ -296,6 +305,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp
|
||||
return &Response{Type: CmdCop, Cop: res.Coprocessor}
|
||||
case *tikvpb.BatchCommandsResponse_Response_PessimisticLock:
|
||||
return &Response{Type: CmdPessimisticLock, PessimisticLock: res.PessimisticLock}
|
||||
case *tikvpb.BatchCommandsResponse_Response_PessimisticRollback:
|
||||
return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -326,6 +337,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error {
|
||||
req.Prewrite.Context = ctx
|
||||
case CmdPessimisticLock:
|
||||
req.PessimisticLock.Context = ctx
|
||||
case CmdPessimisticRollback:
|
||||
req.PessimisticRollback.Context = ctx
|
||||
case CmdCommit:
|
||||
req.Commit.Context = ctx
|
||||
case CmdCleanup:
|
||||
@ -398,6 +411,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) {
|
||||
resp.PessimisticLock = &kvrpcpb.PessimisticLockResponse{
|
||||
RegionError: e,
|
||||
}
|
||||
case CmdPessimisticRollback:
|
||||
resp.PessimisticRollback = &kvrpcpb.PessimisticRollbackResponse{
|
||||
RegionError: e,
|
||||
}
|
||||
case CmdCommit:
|
||||
resp.Commit = &kvrpcpb.CommitResponse{
|
||||
RegionError: e,
|
||||
@ -504,6 +521,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) {
|
||||
e = resp.Scan.GetRegionError()
|
||||
case CmdPessimisticLock:
|
||||
e = resp.PessimisticLock.GetRegionError()
|
||||
case CmdPessimisticRollback:
|
||||
e = resp.PessimisticRollback.GetRegionError()
|
||||
case CmdPrewrite:
|
||||
e = resp.Prewrite.GetRegionError()
|
||||
case CmdCommit:
|
||||
@ -572,6 +591,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
|
||||
resp.Prewrite, err = client.KvPrewrite(ctx, req.Prewrite)
|
||||
case CmdPessimisticLock:
|
||||
resp.PessimisticLock, err = client.KvPessimisticLock(ctx, req.PessimisticLock)
|
||||
case CmdPessimisticRollback:
|
||||
resp.PessimisticRollback, err = client.KVPessimisticRollback(ctx, req.PessimisticRollback)
|
||||
case CmdCommit:
|
||||
resp.Commit, err = client.KvCommit(ctx, req.Commit)
|
||||
case CmdCleanup:
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dgryski/go-farm"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -319,17 +320,16 @@ func (txn *tikvTxn) close() {
|
||||
}
|
||||
|
||||
func (txn *tikvTxn) Rollback() error {
|
||||
if !txn.valid {
|
||||
return kv.ErrInvalidTxn
|
||||
}
|
||||
// Clean up pessimistic lock.
|
||||
if txn.IsPessimistic() && txn.committer != nil {
|
||||
err := txn.rollbackPessimisticLock()
|
||||
err := txn.rollbackPessimisticLocks()
|
||||
if err != nil {
|
||||
logutil.Logger(context.Background()).Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if !txn.valid {
|
||||
return kv.ErrInvalidTxn
|
||||
}
|
||||
txn.close()
|
||||
logutil.Logger(context.Background()).Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS()))
|
||||
tikvTxnCmdCountWithRollback.Inc()
|
||||
@ -337,16 +337,11 @@ func (txn *tikvTxn) Rollback() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *tikvTxn) rollbackPessimisticLock() error {
|
||||
c := txn.committer
|
||||
if err := c.initKeysAndMutations(); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if len(c.keys) == 0 {
|
||||
func (txn *tikvTxn) rollbackPessimisticLocks() error {
|
||||
if len(txn.lockKeys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), c.keys)
|
||||
return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys)
|
||||
}
|
||||
|
||||
func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv.Key) error {
|
||||
@ -370,7 +365,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv
|
||||
txn.committer.primaryKey = keys[0]
|
||||
}
|
||||
|
||||
bo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(txn.vars)
|
||||
bo := NewBackoffer(ctx, pessimisticLockMaxBackoff).WithVars(txn.vars)
|
||||
keys1 := make([][]byte, len(keys))
|
||||
for i, key := range keys {
|
||||
keys1[i] = key
|
||||
@ -381,6 +376,14 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv
|
||||
txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys1) == 1
|
||||
err := txn.committer.pessimisticLockKeys(bo, keys1)
|
||||
if err != nil {
|
||||
wg := txn.asyncPessimisticRollback(ctx, keys1)
|
||||
if dl, ok := errors.Cause(err).(*ErrDeadlock); ok && hashInKeys(dl.DeadlockKeyHash, keys) {
|
||||
dl.IsRetryable = true
|
||||
// Wait for the pessimistic rollback to finish before we retry the statement.
|
||||
wg.Wait()
|
||||
// Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock.
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -393,6 +396,36 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv
|
||||
return nil
|
||||
}
|
||||
|
||||
func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup {
|
||||
// Clone a new committer for execute in background.
|
||||
committer := &twoPhaseCommitter{
|
||||
store: txn.committer.store,
|
||||
connID: txn.committer.connID,
|
||||
startTS: txn.committer.startTS,
|
||||
forUpdateTS: txn.committer.forUpdateTS,
|
||||
primaryKey: txn.committer.primaryKey,
|
||||
}
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
err := committer.pessimisticRollbackKeys(NewBackoffer(ctx, pessimisticRollbackMaxBackoff), keys)
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err))
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
return wg
|
||||
}
|
||||
|
||||
func hashInKeys(deadlockKeyHash uint64, keys []kv.Key) bool {
|
||||
for _, key := range keys {
|
||||
if farm.Fingerprint64(key) == deadlockKeyHash {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (txn *tikvTxn) IsReadOnly() bool {
|
||||
return !txn.dirty
|
||||
}
|
||||
|
||||
@ -42,11 +42,11 @@ func NewDetector() *Detector {
|
||||
|
||||
// ErrDeadlock is returned when deadlock is detected.
|
||||
type ErrDeadlock struct {
|
||||
keyHash uint64
|
||||
KeyHash uint64
|
||||
}
|
||||
|
||||
func (e *ErrDeadlock) Error() string {
|
||||
return fmt.Sprintf("deadlock(%d)", e.keyHash)
|
||||
return fmt.Sprintf("deadlock(%d)", e.KeyHash)
|
||||
}
|
||||
|
||||
// Detect detects deadlock for the sourceTxn on a locked key.
|
||||
@ -67,7 +67,7 @@ func (d *Detector) doDetect(sourceTxn, waitForTxn uint64) *ErrDeadlock {
|
||||
}
|
||||
for _, nextTarget := range list.txns {
|
||||
if nextTarget.txn == sourceTxn {
|
||||
return &ErrDeadlock{keyHash: nextTarget.keyHash}
|
||||
return &ErrDeadlock{KeyHash: nextTarget.keyHash}
|
||||
}
|
||||
if err := d.doDetect(sourceTxn, nextTarget.txn); err != nil {
|
||||
return err
|
||||
|
||||
@ -23,6 +23,8 @@ import (
|
||||
|
||||
"github.com/pingcap/check"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
@ -262,3 +264,12 @@ func (tk *TestKit) ResultSetToResultWithCtx(ctx context.Context, rs sqlexec.Reco
|
||||
func Rows(args ...string) [][]interface{} {
|
||||
return testutil.RowsWithSep(" ", args...)
|
||||
}
|
||||
|
||||
// GetTableID gets table ID by name.
|
||||
func (tk *TestKit) GetTableID(tableName string) int64 {
|
||||
dom := domain.GetDomain(tk.Se)
|
||||
is := dom.InfoSchema()
|
||||
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(tableName))
|
||||
tk.c.Assert(err, check.IsNil)
|
||||
return tbl.Meta().ID
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user