store: fix data race about KVStore.tikvClient (#24655)
This commit is contained in:
@ -48,7 +48,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
|
||||
CommitVersion: c.commitTS,
|
||||
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
|
||||
|
||||
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
|
||||
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
|
||||
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)
|
||||
|
||||
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
|
||||
|
||||
@ -66,10 +66,13 @@ var oracleUpdateInterval = 2000
|
||||
|
||||
// KVStore contains methods to interact with a TiKV cluster.
|
||||
type KVStore struct {
|
||||
clusterID uint64
|
||||
uuid string
|
||||
oracle oracle.Oracle
|
||||
client Client
|
||||
clusterID uint64
|
||||
uuid string
|
||||
oracle oracle.Oracle
|
||||
clientMu struct {
|
||||
sync.RWMutex
|
||||
client Client
|
||||
}
|
||||
pdClient pd.Client
|
||||
regionCache *RegionCache
|
||||
lockResolver *LockResolver
|
||||
@ -133,7 +136,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
|
||||
clusterID: pdClient.GetClusterID(context.TODO()),
|
||||
uuid: uuid,
|
||||
oracle: o,
|
||||
client: reqCollapse{client},
|
||||
pdClient: pdClient,
|
||||
regionCache: NewRegionCache(pdClient),
|
||||
kv: spkv,
|
||||
@ -142,6 +144,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
|
||||
closed: make(chan struct{}),
|
||||
replicaReadSeed: rand.Uint32(),
|
||||
}
|
||||
store.clientMu.client = reqCollapse{client}
|
||||
store.lockResolver = newLockResolver(store)
|
||||
|
||||
go store.runSafePointChecker()
|
||||
@ -205,7 +208,7 @@ func (s *KVStore) Close() error {
|
||||
s.pdClient.Close()
|
||||
|
||||
close(s.closed)
|
||||
if err := s.client.Close(); err != nil {
|
||||
if err := s.GetTiKVClient().Close(); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -312,7 +315,7 @@ func (s *KVStore) SupportDeleteRange() (supported bool) {
|
||||
|
||||
// SendReq sends a request to region.
|
||||
func (s *KVStore) SendReq(bo *Backoffer, req *tikvrpc.Request, regionID RegionVerID, timeout time.Duration) (*tikvrpc.Response, error) {
|
||||
sender := NewRegionRequestSender(s.regionCache, s.client)
|
||||
sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
|
||||
return sender.SendReq(bo, req, regionID, timeout)
|
||||
}
|
||||
|
||||
@ -343,12 +346,16 @@ func (s *KVStore) SetOracle(oracle oracle.Oracle) {
|
||||
|
||||
// SetTiKVClient resets the client instance.
|
||||
func (s *KVStore) SetTiKVClient(client Client) {
|
||||
s.client = client
|
||||
s.clientMu.Lock()
|
||||
defer s.clientMu.Unlock()
|
||||
s.clientMu.client = client
|
||||
}
|
||||
|
||||
// GetTiKVClient gets the client instance.
|
||||
func (s *KVStore) GetTiKVClient() (client Client) {
|
||||
return s.client
|
||||
s.clientMu.RLock()
|
||||
defer s.clientMu.RUnlock()
|
||||
return s.clientMu.client
|
||||
}
|
||||
|
||||
func (s *KVStore) getSafeTS(storeID uint64) uint64 {
|
||||
|
||||
@ -157,7 +157,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
|
||||
|
||||
req := c.buildPrewriteRequest(batch, txnSize)
|
||||
for {
|
||||
sender := NewRegionRequestSender(c.store.regionCache, c.store.client)
|
||||
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
|
||||
resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutShort)
|
||||
|
||||
// If we fail to receive response for async commit prewrite, it will be undetermined whether this
|
||||
|
||||
@ -164,7 +164,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
|
||||
zap.String("nextEndKey", kv.StrKey(s.nextEndKey)),
|
||||
zap.Bool("reverse", s.reverse),
|
||||
zap.Uint64("txnStartTS", s.startTS()))
|
||||
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.client)
|
||||
sender := NewRegionRequestSender(s.snapshot.store.regionCache, s.snapshot.store.GetTiKVClient())
|
||||
var reqEndKey, reqStartKey []byte
|
||||
var loc *KeyLocation
|
||||
var err error
|
||||
|
||||
@ -123,7 +123,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool
|
||||
Priority: kvrpcpb.CommandPri_Normal,
|
||||
})
|
||||
|
||||
sender := NewRegionRequestSender(s.regionCache, s.client)
|
||||
sender := NewRegionRequestSender(s.regionCache, s.GetTiKVClient())
|
||||
resp, err := sender.SendReq(bo, req, batch.regionID, ReadTimeoutShort)
|
||||
|
||||
batchResp := singleBatchResp{resp: resp}
|
||||
|
||||
Reference in New Issue
Block a user