diff --git a/store/tikv/client.go b/store/tikv/client.go index 0de1b543cb..6b8f69aa44 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -133,13 +133,16 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { } logutil.Logger(context.Background()).Error("batchRecvLoop error when receive", zap.Error(err)) - // Hold the lock to forbid batchSendLoop using the old client. - c.clientLock.Lock() - c.failPendingRequests(err) // fail all pending requests. - for { // try to re-create the streaming in the loop. + for { // try to re-create the streaming in the loop. + // Hold the lock to forbid batchSendLoop using the old client. + c.clientLock.Lock() + c.failPendingRequests(err) // fail all pending requests. + // Re-establish a application layer stream. TCP layer is handled by gRPC. tikvClient := tikvpb.NewTikvClient(c.conn) streamClient, err := tikvClient.BatchCommands(context.TODO()) + c.clientLock.Unlock() + if err == nil { logutil.Logger(context.Background()).Info("batchRecvLoop re-create streaming success") c.client = streamClient @@ -149,7 +152,6 @@ func (c *batchCommandsClient) batchRecvLoop(cfg config.TiKVClient) { // TODO: Use a more smart backoff strategy. time.Sleep(time.Second) } - c.clientLock.Unlock() continue }