diff --git a/store/tikv/client.go b/store/tikv/client.go index 2463527224..3316de6b39 100644 --- a/store/tikv/client.go +++ b/store/tikv/client.go @@ -80,21 +80,23 @@ type connArray struct { streamTimeout chan *tikvrpc.Lease // batchConn is not null when batch is enabled. *batchConn + done chan struct{} } -func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) { +func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) { a := &connArray{ index: 0, v: make([]*grpc.ClientConn, maxSize), streamTimeout: make(chan *tikvrpc.Lease, 1024), + done: make(chan struct{}), } - if err := a.Init(addr, security, idleNotify, done); err != nil { + if err := a.Init(addr, security, idleNotify); err != nil { return nil, err } return a, nil } -func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error { +func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error { a.target = addr opt := grpc.WithInsecure() @@ -162,7 +164,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint a.batchCommandsClients = append(a.batchCommandsClients, batchClient) } } - go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done) + go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done) if allowBatch { go a.batchSendLoop(cfg.TiKVClient) } @@ -187,6 +189,8 @@ func (a *connArray) Close() { a.v[i] = nil } } + + close(a.done) } // rpcClient is RPC client struct. @@ -195,7 +199,6 @@ func (a *connArray) Close() { // that there are too many concurrent requests which overload the service of TiKV. type rpcClient struct { sync.RWMutex - done chan struct{} conns map[string]*connArray security config.Security @@ -208,7 +211,6 @@ type rpcClient struct { func newRPCClient(security config.Security) *rpcClient { return &rpcClient{ - done: make(chan struct{}, 1), conns: make(map[string]*connArray), security: security, } @@ -244,7 +246,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) { if !ok { var err error connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount - array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done) + array, err = newConnArray(connCount, addr, c.security, &c.idleNotify) if err != nil { return nil, err } @@ -352,7 +354,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R func (c *rpcClient) Close() error { // TODO: add a unit test for SendRequest After Closed - close(c.done) c.closeConns() return nil }