diff --git a/store/tikv/commit.go b/store/tikv/commit.go index 449081860c..10c60d9f6d 100644 --- a/store/tikv/commit.go +++ b/store/tikv/commit.go @@ -48,7 +48,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch CommitVersion: c.commitTS, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) - sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) // If we fail to receive response for the request that commits primary key, it will be undetermined whether this diff --git a/store/tikv/kv.go b/store/tikv/kv.go index f61db4168e..bbf8517a42 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -66,10 +66,13 @@ var oracleUpdateInterval = 2000 // KVStore contains methods to interact with a TiKV cluster. type KVStore struct { - clusterID uint64 - uuid string - oracle oracle.Oracle - client Client + clusterID uint64 + uuid string + oracle oracle.Oracle + clientMu struct { + sync.RWMutex + client Client + } pdClient pd.Client regionCache *RegionCache lockResolver *LockResolver @@ -133,7 +136,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client clusterID: pdClient.GetClusterID(context.TODO()), uuid: uuid, oracle: o, - client: reqCollapse{client}, pdClient: pdClient, regionCache: NewRegionCache(pdClient), kv: spkv, @@ -142,6 +144,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client closed: make(chan struct{}), replicaReadSeed: rand.Uint32(), } + store.clientMu.client = reqCollapse{client} store.lockResolver = newLockResolver(store) go store.runSafePointChecker() @@ -205,7 +208,7 @@ func (s *KVStore) Close() error { s.pdClient.Close() close(s.closed) - if err := s.client.Close(); err != nil { + if err := s.GetTiKVClient().Close(); err != nil { return errors.Trace(err) } @@ -312,7 +315,7 @@ func (s *KVStore) SupportDeleteRange() (supported bool) { // SendReq sends a request to region. func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) { - sender := NewRegionRequestSender(s.regionCache, s.client) + sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) return sender.SendReq(bo, req, regionID, timeout) } @@ -343,12 +346,16 @@ func (s *KVStore) SetOracle(oracle oracle.Oracle) { // SetTiKVClient resets the client instance. func (s *KVStore) SetTiKVClient(client Client) { - s.client = client + s.clientMu.Lock() + defer s.clientMu.Unlock() + s.clientMu.client = client } // GetTiKVClient gets the client instance. func (s *KVStore) GetTiKVClient() (client Client) { - return s.client + s.clientMu.RLock() + defer s.clientMu.RUnlock() + return s.clientMu.client } func (s *KVStore) getSafeTS(storeID uint64) uint64 { diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index 7097ba5dbc..ffb47e1fb4 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -157,7 +157,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff req := c.buildPrewriteRequest(batch, txnSize) for { - sender := NewRegionRequestSender(c.store.regionCache, c.store.client) + sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort) // If we fail to receive response for async commit prewrite, it will be undetermined whether this diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 6c43b7bdee..035291a783 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -164,7 +164,7 @@ func (s *Scanner) getData(bo *Backoffer) error { zap.String("nextEndKey", kv.StrKey(s.nextEndKey)), zap.Bool("reverse", s.reverse), zap.Uint64("txnStartTS", s.startTS())) - sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client) + sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient()) var reqEndKey, reqStartKey []byte var loc *KeyLocation var err error diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 38ce24917d..c33a89efc1 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -123,7 +123,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool Priority: kvrpcpb.CommandPri_Normal, }) - sender := NewRegionRequestSender(s.regionCache, s.client) + sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) resp, err := sender.SendReq(bo, req, batch.regionID, ReadTimeoutShort) batchResp := singleBatchResp{resp: resp}