store/tikv: fix CheckStreamTimeoutLoop goroutine leak (#13812)

This commit is contained in:
tiancaiamao
2019-12-05 17:54:10 +08:00
committed by pingcap-github-bot
parent 448af254ec
commit b1391ec9ee

View File

@ -80,21 +80,23 @@ type connArray struct {
streamTimeout chan *tikvrpc.Lease
// batchConn is not null when batch is enabled.
*batchConn
done chan struct{}
}
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) (*connArray, error) {
func newConnArray(maxSize uint, addr string, security config.Security, idleNotify *uint32) (*connArray, error) {
a := &connArray{
index: 0,
v: make([]*grpc.ClientConn, maxSize),
streamTimeout: make(chan *tikvrpc.Lease, 1024),
done: make(chan struct{}),
}
if err := a.Init(addr, security, idleNotify, done); err != nil {
if err := a.Init(addr, security, idleNotify); err != nil {
return nil, err
}
return a, nil
}
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32, done <-chan struct{}) error {
func (a *connArray) Init(addr string, security config.Security, idleNotify *uint32) error {
a.target = addr
opt := grpc.WithInsecure()
@ -162,7 +164,7 @@ func (a *connArray) Init(addr string, security config.Security, idleNotify *uint
a.batchCommandsClients = append(a.batchCommandsClients, batchClient)
}
}
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, done)
go tikvrpc.CheckStreamTimeoutLoop(a.streamTimeout, a.done)
if allowBatch {
go a.batchSendLoop(cfg.TiKVClient)
}
@ -187,6 +189,8 @@ func (a *connArray) Close() {
a.v[i] = nil
}
}
close(a.done)
}
// rpcClient is RPC client struct.
@ -195,7 +199,6 @@ func (a *connArray) Close() {
// that there are too many concurrent requests which overload the service of TiKV.
type rpcClient struct {
sync.RWMutex
done chan struct{}
conns map[string]*connArray
security config.Security
@ -208,7 +211,6 @@ type rpcClient struct {
func newRPCClient(security config.Security) *rpcClient {
return &rpcClient{
done: make(chan struct{}, 1),
conns: make(map[string]*connArray),
security: security,
}
@ -244,7 +246,7 @@ func (c *rpcClient) createConnArray(addr string) (*connArray, error) {
if !ok {
var err error
connCount := config.GetGlobalConfig().TiKVClient.GrpcConnectionCount
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify, c.done)
array, err = newConnArray(connCount, addr, c.security, &c.idleNotify)
if err != nil {
return nil, err
}
@ -352,7 +354,6 @@ func (c *rpcClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
func (c *rpcClient) Close() error {
// TODO: add a unit test for SendRequest After Closed
close(c.done)
c.closeConns()
return nil
}