tikv: make requests fail-fast for dial timeouted (#12819)
This commit is contained in:
@ -32,6 +32,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
)
|
||||
|
||||
type batchConn struct {
|
||||
@ -217,6 +218,17 @@ func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries
|
||||
for i, requestID := range request.RequestIds {
|
||||
c.batched.Store(requestID, entries[i])
|
||||
}
|
||||
|
||||
if err := c.initBatchClient(); err != nil {
|
||||
logutil.BgLogger().Warn(
|
||||
"init create streaming fail",
|
||||
zap.String("target", c.target),
|
||||
zap.Error(err),
|
||||
)
|
||||
c.failPendingRequests(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.client.Send(request); err != nil {
|
||||
logutil.BgLogger().Info(
|
||||
"sending batch commands meets error",
|
||||
@ -248,19 +260,40 @@ func (c *batchCommandsClient) failPendingRequests(err error) {
|
||||
})
|
||||
}
|
||||
|
||||
func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error {
|
||||
c.failPendingRequests(err) // fail all pending requests.
|
||||
// Re-establish a application layer stream. TCP layer is handled by gRPC.
|
||||
tikvClient := tikvpb.NewTikvClient(c.conn)
|
||||
streamClient, err := tikvClient.BatchCommands(context.TODO())
|
||||
if err == nil {
|
||||
logutil.BgLogger().Info(
|
||||
"batchRecvLoop re-create streaming success",
|
||||
zap.String("target", c.target),
|
||||
)
|
||||
c.client = streamClient
|
||||
func (c *batchCommandsClient) waitConnReady() (err error) {
|
||||
dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
|
||||
for {
|
||||
s := c.conn.GetState()
|
||||
if s == connectivity.Ready {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
if !c.conn.WaitForStateChange(dialCtx, s) {
|
||||
cancel()
|
||||
err = dialCtx.Err()
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error {
|
||||
c.failPendingRequests(perr) // fail all pending requests.
|
||||
|
||||
err := c.waitConnReady()
|
||||
// Re-establish a application layer stream. TCP layer is handled by gRPC.
|
||||
if err == nil {
|
||||
tikvClient := tikvpb.NewTikvClient(c.conn)
|
||||
var streamClient tikvpb.Tikv_BatchCommandsClient
|
||||
streamClient, err = tikvClient.BatchCommands(context.TODO())
|
||||
if err == nil {
|
||||
logutil.BgLogger().Info(
|
||||
"batchRecvLoop re-create streaming success",
|
||||
zap.String("target", c.target),
|
||||
)
|
||||
c.client = streamClient
|
||||
return nil
|
||||
}
|
||||
}
|
||||
logutil.BgLogger().Info(
|
||||
"batchRecvLoop re-create streaming fail",
|
||||
@ -457,15 +490,6 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []*
|
||||
RequestIds: requestIDs,
|
||||
}
|
||||
|
||||
if err := cli.initBatchClient(); err != nil {
|
||||
logutil.BgLogger().Warn(
|
||||
"init create streaming fail",
|
||||
zap.String("target", cli.target),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
cli.send(req, entries)
|
||||
return
|
||||
}
|
||||
@ -474,6 +498,11 @@ func (c *batchCommandsClient) initBatchClient() error {
|
||||
if c.client != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := c.waitConnReady(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize batch streaming clients.
|
||||
tikvClient := tikvpb.NewTikvClient(c.conn)
|
||||
streamClient, err := tikvClient.BatchCommands(context.TODO())
|
||||
|
||||
@ -25,6 +25,7 @@ import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/google/btree"
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
pd "github.com/pingcap/pd/client"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
@ -312,6 +313,12 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// enable by `curl -XPUT -d '1*return("[some-addr]")->return("")' http://host:port/github.com/pingcap/tidb/store/tikv/injectWrongStoreAddr`
|
||||
failpoint.Inject("injectWrongStoreAddr", func(val failpoint.Value) {
|
||||
if a, ok := val.(string); ok && len(a) > 0 {
|
||||
addr = a
|
||||
}
|
||||
})
|
||||
if store == nil || len(addr) == 0 {
|
||||
// Store not found, region must be out of date.
|
||||
cachedRegion.invalidate()
|
||||
|
||||
Reference in New Issue
Block a user