From e8631bf387c8bc25f39d495447b258758b110b59 Mon Sep 17 00:00:00 2001 From: lysu Date: Fri, 25 Oct 2019 11:02:00 +0800 Subject: [PATCH] tikv: make requests fail-fast for dial timeouted (#12819) --- store/tikv/client_batch.go | 71 +++++++++++++++++++++++++++----------- store/tikv/region_cache.go | 7 ++++ 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index d10ca55b68..fe2dc76c42 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" ) type batchConn struct { @@ -217,6 +218,17 @@ func (c *batchCommandsClient) send(request *tikvpb.BatchCommandsRequest, entries for i, requestID := range request.RequestIds { c.batched.Store(requestID, entries[i]) } + + if err := c.initBatchClient(); err != nil { + logutil.BgLogger().Warn( + "init create streaming fail", + zap.String("target", c.target), + zap.Error(err), + ) + c.failPendingRequests(err) + return + } + if err := c.client.Send(request); err != nil { logutil.BgLogger().Info( "sending batch commands meets error", @@ -248,19 +260,40 @@ func (c *batchCommandsClient) failPendingRequests(err error) { }) } -func (c *batchCommandsClient) reCreateStreamingClientOnce(err error) error { - c.failPendingRequests(err) // fail all pending requests. - // Re-establish a application layer stream. TCP layer is handled by gRPC. - tikvClient := tikvpb.NewTikvClient(c.conn) - streamClient, err := tikvClient.BatchCommands(context.TODO()) - if err == nil { - logutil.BgLogger().Info( - "batchRecvLoop re-create streaming success", - zap.String("target", c.target), - ) - c.client = streamClient +func (c *batchCommandsClient) waitConnReady() (err error) { + dialCtx, cancel := context.WithTimeout(context.Background(), dialTimeout) + for { + s := c.conn.GetState() + if s == connectivity.Ready { + cancel() + break + } + if !c.conn.WaitForStateChange(dialCtx, s) { + cancel() + err = dialCtx.Err() + return + } + } + return +} - return nil +func (c *batchCommandsClient) reCreateStreamingClientOnce(perr error) error { + c.failPendingRequests(perr) // fail all pending requests. + + err := c.waitConnReady() + // Re-establish a application layer stream. TCP layer is handled by gRPC. + if err == nil { + tikvClient := tikvpb.NewTikvClient(c.conn) + var streamClient tikvpb.Tikv_BatchCommandsClient + streamClient, err = tikvClient.BatchCommands(context.TODO()) + if err == nil { + logutil.BgLogger().Info( + "batchRecvLoop re-create streaming success", + zap.String("target", c.target), + ) + c.client = streamClient + return nil + } } logutil.BgLogger().Info( "batchRecvLoop re-create streaming fail", @@ -457,15 +490,6 @@ func (a *batchConn) getClientAndSend(entries []*batchCommandsEntry, requests []* RequestIds: requestIDs, } - if err := cli.initBatchClient(); err != nil { - logutil.BgLogger().Warn( - "init create streaming fail", - zap.String("target", cli.target), - zap.Error(err), - ) - return - } - cli.send(req, entries) return } @@ -474,6 +498,11 @@ func (c *batchCommandsClient) initBatchClient() error { if c.client != nil { return nil } + + if err := c.waitConnReady(); err != nil { + return err + } + // Initialize batch streaming clients. tikvClient := tikvpb.NewTikvClient(c.conn) streamClient, err := tikvClient.BatchCommands(context.TODO()) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 996eb6327f..cc3df77340 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -25,6 +25,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/google/btree" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" pd "github.com/pingcap/pd/client" "github.com/pingcap/tidb/kv" @@ -312,6 +313,12 @@ func (c *RegionCache) GetTiKVRPCContext(bo *Backoffer, id RegionVerID, replicaRe if err != nil { return nil, err } + // enable by `curl -XPUT -d '1*return("[some-addr]")->return("")' http://host:port/github.com/pingcap/tidb/store/tikv/injectWrongStoreAddr` + failpoint.Inject("injectWrongStoreAddr", func(val failpoint.Value) { + if a, ok := val.(string); ok && len(a) > 0 { + addr = a + } + }) if store == nil || len(addr) == 0 { // Store not found, region must be out of date. cachedRegion.invalidate()