store: refactor code to refine some static check issue (#13828)
This commit is contained in:
committed by
pingcap-github-bot
parent
093b27aea8
commit
236f5efcf8
@ -794,25 +794,23 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *Bac
|
||||
ForUpdateTs: c.forUpdateTS,
|
||||
Keys: batch.keys,
|
||||
})
|
||||
for {
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = c.pessimisticRollbackKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regionErr, err := resp.GetRegionError()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if regionErr != nil {
|
||||
err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String()))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = c.pessimisticRollbackKeys(bo, batch.keys)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getTxnPriority(txn *tikvTxn) pb.CommandPri {
|
||||
@ -835,8 +833,9 @@ func kvPriorityToCommandPri(pri int) pb.CommandPri {
|
||||
return pb.CommandPri_Low
|
||||
case kv.PriorityHigh:
|
||||
return pb.CommandPri_High
|
||||
default:
|
||||
return pb.CommandPri_Normal
|
||||
}
|
||||
return pb.CommandPri_Normal
|
||||
}
|
||||
|
||||
func (c *twoPhaseCommitter) setDetail(d *execdetails.CommitDetails) {
|
||||
|
||||
@ -35,6 +35,7 @@ import (
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -132,7 +133,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
|
||||
grpc.WithUnaryInterceptor(unaryInterceptor),
|
||||
grpc.WithStreamInterceptor(streamInterceptor),
|
||||
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxRecvMsgSize)),
|
||||
grpc.WithBackoffMaxDelay(time.Second*3),
|
||||
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.Config{BaseDelay: time.Second * 3}}),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: time.Duration(keepAlive) * time.Second,
|
||||
Timeout: time.Duration(keepAliveTimeout) * time.Second,
|
||||
|
||||
@ -491,7 +491,6 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*
|
||||
}
|
||||
|
||||
cli.send(req, entries)
|
||||
return
|
||||
}
|
||||
|
||||
func (c *batchCommandsClient) initBatchClient() error {
|
||||
|
||||
@ -179,7 +179,7 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
|
||||
w.lastFinish = time.Now()
|
||||
if err != nil {
|
||||
logutil.Logger(ctx).Error("[gc worker] runGCJob", zap.Error(err))
|
||||
break
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
logutil.Logger(ctx).Info("[gc worker] quit", zap.String("uuid", w.uuid))
|
||||
|
||||
@ -358,7 +358,6 @@ func (t *txnExpireTime) update(lockExpire int64) {
|
||||
if lockExpire < t.txnExpire {
|
||||
t.txnExpire = lockExpire
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *txnExpireTime) value() int64 {
|
||||
|
||||
@ -267,12 +267,11 @@ type rangeTaskWorker struct {
|
||||
// run starts the worker. It collects all objects from `w.taskCh` and process them one by one.
|
||||
func (w *rangeTaskWorker) run(ctx context.Context, cancel context.CancelFunc) {
|
||||
defer w.wg.Done()
|
||||
|
||||
for r := range w.taskCh {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
w.err = ctx.Err()
|
||||
break
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
|
||||
@ -82,9 +82,7 @@ type RegionStore struct {
|
||||
// clone clones region store struct.
|
||||
func (r *RegionStore) clone() *RegionStore {
|
||||
storeFails := make([]uint32, len(r.stores))
|
||||
for i, e := range r.storeFails {
|
||||
storeFails[i] = e
|
||||
}
|
||||
copy(storeFails, r.storeFails)
|
||||
return &RegionStore{
|
||||
workTiFlashIdx: r.workTiFlashIdx,
|
||||
workTiKVIdx: r.workTiKVIdx,
|
||||
@ -1150,7 +1148,6 @@ retry:
|
||||
if !r.compareAndSwapStore(oldRegionStore, newRegionStore) {
|
||||
goto retry
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Contains checks whether the key is in the region, for the maximum region endKey is empty.
|
||||
@ -1304,7 +1301,6 @@ retryMarkResolved:
|
||||
if !s.compareAndSwapState(oldState, newState) {
|
||||
goto retryMarkResolved
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) getResolveState() resolveState {
|
||||
|
||||
@ -20,8 +20,8 @@ import (
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
@ -218,7 +218,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
|
||||
} else if atomic.LoadUint32(&ShuttingDown) > 0 {
|
||||
return errTiDBShuttingDown
|
||||
}
|
||||
if grpc.Code(errors.Cause(err)) == codes.Canceled {
|
||||
if status.Code(errors.Cause(err)) == codes.Canceled {
|
||||
select {
|
||||
case <-bo.ctx.Done():
|
||||
return errors.Trace(err)
|
||||
|
||||
@ -32,10 +32,7 @@ import (
|
||||
)
|
||||
|
||||
func equalRegionStartKey(key, regionStartKey []byte) bool {
|
||||
if bytes.Equal(key, regionStartKey) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return bytes.Equal(key, regionStartKey)
|
||||
}
|
||||
|
||||
func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) {
|
||||
|
||||
Reference in New Issue
Block a user