From c9ff8458bcc07792d55196ea5ba0e28ec6a59ee1 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Sun, 10 Jan 2021 11:41:27 +0800 Subject: [PATCH] store/tikv: Add more failpoints about transaction (#22160) Signed-off-by: MyonKeminta --- store/tikv/2pc.go | 45 ++++++++++++++++++++- store/tikv/pessimistic.go | 24 ++++++++++++ store/tikv/prewrite.go | 27 ++++++++++++- store/tikv/region_request.go | 76 ++++++++++++++++++++++++------------ 4 files changed, 146 insertions(+), 26 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 0ab4fa1aaf..2186142f19 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -18,6 +18,7 @@ import ( "context" "encoding/hex" "math" + "math/rand" "strings" "sync" "sync/atomic" @@ -741,6 +742,20 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { secondaryBo := NewBackofferWithVars(context.Background(), int(atomic.LoadUint64(&CommitMaxBackoff)), c.txn.vars) go func() { + if c.connID > 0 { + failpoint.Inject("beforeCommitSecondaries", func(v failpoint.Value) { + if s, ok := v.(string); !ok { + logutil.Logger(bo.ctx).Info("[failpoint] sleep 2s before commit secondary keys", + zap.Uint64("connID", c.connID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS)) + time.Sleep(2 * time.Second) + } else if s == "skip" { + logutil.Logger(bo.ctx).Info("[failpoint] injected skip committing secondaries", + zap.Uint64("connID", c.connID), zap.Uint64("txnStartTS", c.startTS), zap.Uint64("txnCommitTS", c.commitTS)) + failpoint.Return() + } + }) + } + e := c.doActionOnBatches(secondaryBo, action, batchBuilder.allBatches()) if e != nil { logutil.BgLogger().Debug("2PC async doActionOnBatches", @@ -1009,6 +1024,13 @@ func (c *twoPhaseCommitter) checkOnePCFallBack(action twoPhaseCommitAction, batc func (c *twoPhaseCommitter) cleanup(ctx context.Context) { c.cleanWg.Add(1) go func() { + failpoint.Inject("commitFailedSkipCleanup", func() { + logutil.Logger(ctx).Info("[failpoint] injected skip cleanup secondaries on failure", + zap.Uint64("txnStartTS", c.startTS)) + c.cleanWg.Done() + failpoint.Return() + }) + cleanupKeysCtx := context.WithValue(context.Background(), txnStartKey, ctx.Value(txnStartKey)) err := c.cleanupMutations(NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) if err != nil { @@ -1239,7 +1261,24 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } if c.connID > 0 { - failpoint.Inject("beforeCommit", func() {}) + failpoint.Inject("beforeCommit", func(val failpoint.Value) { + // Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like + // `return("delay,fail")`. Then they will be executed sequentially at once. + if v, ok := val.(string); ok { + for _, action := range strings.Split(v, ",") { + // Async commit transactions cannot return error here, since it's already successful. + if action == "fail" && !c.isAsyncCommit() { + logutil.Logger(ctx).Info("[failpoint] injected failure before commit", zap.Uint64("txnStartTS", c.startTS)) + failpoint.Return(errors.New("injected failure before commit")) + } else if action == "delay" { + duration := time.Duration(rand.Int63n(int64(time.Second) * 5)) + logutil.Logger(ctx).Info("[failpoint] injected delay before commit", + zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration)) + time.Sleep(duration) + } + } + } + }) } if c.isAsyncCommit() { @@ -1604,6 +1643,10 @@ func newBatched(primaryKey []byte) *batched { // appendBatchMutationsBySize appends mutations to b. It may split the keys to make // sure each batch's size does not exceed the limit. func (b *batched) appendBatchMutationsBySize(region RegionVerID, mutations CommitterMutations, sizeFn func(k, v []byte) int, limit int) { + failpoint.Inject("twoPCRequestBatchSizeLimit", func() { + limit = 1 + }) + var start, end int for start = 0; start < mutations.Len(); start = end { var size int diff --git a/store/tikv/pessimistic.go b/store/tikv/pessimistic.go index a3cb5105c7..1c51a43d2e 100644 --- a/store/tikv/pessimistic.go +++ b/store/tikv/pessimistic.go @@ -14,6 +14,8 @@ package tikv import ( + "math/rand" + "strings" "sync/atomic" "time" @@ -23,7 +25,9 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util/logutil" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) type actionPessimisticLock struct { @@ -218,6 +222,26 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Bac } func (c *twoPhaseCommitter) pessimisticLockMutations(bo *Backoffer, lockCtx *kv.LockCtx, mutations CommitterMutations) error { + if c.connID > 0 { + failpoint.Inject("beforePessimisticLock", func(val failpoint.Value) { + // Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like + // `return("delay,fail")`. Then they will be executed sequentially at once. + if v, ok := val.(string); ok { + for _, action := range strings.Split(v, ",") { + if action == "delay" { + duration := time.Duration(rand.Int63n(int64(time.Second) * 5)) + logutil.Logger(bo.ctx).Info("[failpoint] injected delay at pessimistic lock", + zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration)) + time.Sleep(duration) + } else if action == "fail" { + logutil.Logger(bo.ctx).Info("[failpoint] injected failure at pessimistic lock", + zap.Uint64("txnStartTS", c.startTS)) + failpoint.Return(errors.New("injected failure at pessimistic lock")) + } + } + } + }) + } return c.doActionOnMutations(bo, actionPessimisticLock{lockCtx}, mutations) } diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index bc1780f343..c9432bcaf6 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -14,6 +14,7 @@ package tikv import ( + "encoding/hex" "math" "sync/atomic" "time" @@ -72,11 +73,25 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } }) + ttl := c.lockTTL + + if c.connID > 0 { + failpoint.Inject("twoPCShortLockTTL", func() { + ttl = 1 + keys := make([]string, 0, len(mutations)) + for _, m := range mutations { + keys = append(keys, hex.EncodeToString(m.Key)) + } + logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on prewrite", + zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys)) + }) + } + req := &pb.PrewriteRequest{ Mutations: mutations, PrimaryLock: c.primary(), StartVersion: c.startTS, - LockTtl: c.lockTTL, + LockTtl: ttl, IsPessimisticLock: isPessimisticLock, ForUpdateTs: c.forUpdateTS, TxnSize: txnSize, @@ -111,6 +126,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff // regions. It invokes `prewriteMutations` recursively here, and the number of batches will be // checked there. + if c.connID > 0 { + failpoint.Inject("prewritePrimaryFail", func() { + if batch.isPrimary { + logutil.Logger(bo.ctx).Info("[failpoint] injected error on prewriting primary batch", + zap.Uint64("txnStartTS", c.startTS)) + failpoint.Return(errors.New("injected error on prewriting primary batch")) + } + }) + } + txnSize := uint64(c.regionTxnSize[batch.region.id]) // When we retry because of a region miss, we don't know the transaction size. We set the transaction size here // to MaxUint64 to avoid unexpected "resolve lock lite". diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index c88e65af3c..65c9c5e773 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" @@ -420,34 +421,61 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, rpcCtx *RPCContext, ctx, cancel = rawHook.(*RPCCanceller).WithCancel(ctx) defer cancel() } - start := time.Now() - resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout) - if s.Stats != nil { - recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) - failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) { - if val.(bool) { - if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 { - failpoint.Return(&tikvrpc.Response{ - Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}}, - }, false, nil) - } - } + + var connID uint64 + if v := bo.ctx.Value(sessionctx.ConnID); v != nil { + connID = v.(uint64) + } + + injectFailOnSend := false + if connID > 0 { + failpoint.Inject("rpcFailOnSend", func() { + logutil.Logger(ctx).Info("[failpoint] injected RPC error on send", zap.Stringer("type", req.Type), + zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context)) + injectFailOnSend = true + err = errors.New("injected RPC error on send") }) } - failpoint.Inject("rpcContextCancelErr", func(val failpoint.Value) { - if val.(bool) { - ctx1, cancel := context.WithCancel(context.Background()) - cancel() - select { - case <-ctx1.Done(): - } - - ctx = ctx1 - err = ctx.Err() - resp = nil + if !injectFailOnSend { + start := time.Now() + resp, err = s.client.SendRequest(ctx, rpcCtx.Addr, req, timeout) + if s.Stats != nil { + recordRegionRequestRuntimeStats(s.Stats, req.Type, time.Since(start)) + failpoint.Inject("tikvStoreRespResult", func(val failpoint.Value) { + if val.(bool) { + if req.Type == tikvrpc.CmdCop && bo.totalSleep == 0 { + failpoint.Return(&tikvrpc.Response{ + Resp: &coprocessor.Response{RegionError: &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}}, + }, false, nil) + } + } + }) } - }) + + if connID > 0 { + failpoint.Inject("rpcFailOnRecv", func() { + logutil.Logger(ctx).Info("[failpoint] injected RPC error on recv", zap.Stringer("type", req.Type), + zap.Stringer("req", req.Req.(fmt.Stringer)), zap.Stringer("ctx", &req.Context)) + err = errors.New("injected RPC error on recv") + resp = nil + }) + } + + failpoint.Inject("rpcContextCancelErr", func(val failpoint.Value) { + if val.(bool) { + ctx1, cancel := context.WithCancel(context.Background()) + cancel() + select { + case <-ctx1.Done(): + } + + ctx = ctx1 + err = ctx.Err() + resp = nil + } + }) + } if err != nil { s.rpcError = err