From d6153b2cc87a26fac55f2da76abb86ae2aa59a7b Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 1 Aug 2025 14:56:16 +0800 Subject: [PATCH] *: bump client-go to enable async-batch-get (#62294) close pingcap/tidb#62293 --- DEPS.bzl | 12 ++++++------ go.mod | 2 +- go.sum | 4 ++-- pkg/config/config.go | 5 +++++ pkg/store/copr/BUILD.bazel | 2 ++ pkg/store/copr/mpp_probe_test.go | 12 ++++++++++++ pkg/store/copr/store.go | 9 +++++++++ pkg/store/driver/BUILD.bazel | 1 + pkg/store/driver/tikv_driver.go | 15 +++++++++++++++ pkg/store/mockstore/BUILD.bazel | 1 + pkg/store/mockstore/redirector.go | 12 ++++++++++++ pkg/store/mockstore/unistore/BUILD.bazel | 1 + pkg/store/mockstore/unistore/rpc.go | 8 ++++++++ 13 files changed, 75 insertions(+), 9 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index aac85ca4c3..01e435b4ab 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7272,13 +7272,13 @@ def go_deps(): build_tags = ["nextgen"], build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "a911629d5e7c8421e2a69d9f47eb1ad6cb286ed0c13a5a8d201291d4b1739fd9", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20250707065624-97382455050a", + sha256 = "ae7575a9055def212e39121dc880c534f5ffd6a9a50057cf36cc754419ce816a", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20250708031306-557a4986e4c4", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index 33418cd68a..f3d64ba22e 100644 --- a/go.mod +++ b/go.mod @@ -112,7 +112,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/tdakkota/asciicheck v0.4.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a + github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4 github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500 github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index db6b1e4d52..4035fe6968 100644 --- a/go.sum +++ b/go.sum @@ -850,8 +850,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= -github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a h1:5naxLKnWMWNStw2YF5gt0vvfwOTDh1tmsE+tuLNOzCU= -github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a/go.mod h1:SKB18UM0Wi6PTGydLwJuKPjZ3cBtRI/yMcpdt24SSI4= +github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4 h1:EWm+6p9vA9fizcaHzt813nHRMqMnOAoPe1Ap7SEz9Lk= +github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4/go.mod h1:SKB18UM0Wi6PTGydLwJuKPjZ3cBtRI/yMcpdt24SSI4= github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500 h1:ZUCeeEEU76I/jFUK3C9EV3dSSrtVZt3rwpHF6cT8V3k= github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500/go.mod h1:SicyvcZE0fzrGGWW3AEtZWWPRzGw/h5img4/6qiSYws= github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk= diff --git a/pkg/config/config.go b/pkg/config/config.go index f1ef2801a2..034d4552ab 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -342,6 +342,7 @@ func (c *Config) GetTiKVConfig() *tikvcfg.Config { EnableForwarding: c.EnableForwarding, TxnScope: c.Labels["zone"], ZoneLabel: c.Labels["zone"], + EnableAsyncBatchGet: c.Performance.EnableAsyncBatchGet, } } @@ -749,6 +750,9 @@ type Performance struct { // Deprecated: this config will not have any effect ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"` + + // EnableAsyncBatchGet indicates whether to use async API when sending batch-get requests. + EnableAsyncBatchGet bool `toml:"enable-async-batch-get" json:"enable-async-batch-get"` } // PlanCache is the PlanCache section of the config. @@ -1010,6 +1014,7 @@ var defaultConf = Config{ ForceInitStats: true, // Deprecated: Stats are always initialized concurrently. ConcurrentlyInitStats: true, + EnableAsyncBatchGet: true, }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 751809b788..017d18938c 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -62,6 +62,7 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_client_go_v2//txnkv/txnsnapshot", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_client_go_v2//util/async", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//opt", "@com_github_twmb_murmur3//:murmur3", @@ -106,6 +107,7 @@ go_test( "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//util/async", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", ], diff --git a/pkg/store/copr/mpp_probe_test.go b/pkg/store/copr/mpp_probe_test.go index 22d365b06f..a6216471f9 100644 --- a/pkg/store/copr/mpp_probe_test.go +++ b/pkg/store/copr/mpp_probe_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/async" ) const ( @@ -59,6 +60,17 @@ func (t *mockDetectClient) SendRequest( return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{Available: true}}, nil } +func (t *mockDetectClient) SendRequestAsync( + ctx context.Context, + addr string, + req *tikvrpc.Request, + cb async.Callback[*tikvrpc.Response], +) { + go func() { + cb.Schedule(t.SendRequest(ctx, addr, req, tikv.ReadTimeoutMedium)) + }() +} + func (t *mockDetectClient) SetEventListener(_ tikv.ClientEventListener) {} type ProbeTest map[string]*mockDetectClient diff --git a/pkg/store/copr/store.go b/pkg/store/copr/store.go index 556ccc2616..bfb635bcaf 100644 --- a/pkg/store/copr/store.go +++ b/pkg/store/copr/store.go @@ -29,6 +29,7 @@ import ( "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/async" ) type kvStore struct { @@ -73,6 +74,14 @@ func (c *tikvClient) SendRequest(ctx context.Context, addr string, req *tikvrpc. return res, derr.ToTiDBErr(err) } +// SendRequestAsync sends Request asynchronously. +func (c *tikvClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) { + cb.Inject(func(res *tikvrpc.Response, err error) (*tikvrpc.Response, error) { + return res, derr.ToTiDBErr(err) + }) + c.c.SendRequestAsync(ctx, addr, req, cb) +} + func (c *tikvClient) SetEventListener(listener tikv.ClientEventListener) { c.c.SetEventListener(listener) } diff --git a/pkg/store/driver/BUILD.bazel b/pkg/store/driver/BUILD.bazel index 3ec4eade7f..452b18ad9d 100644 --- a/pkg/store/driver/BUILD.bazel +++ b/pkg/store/driver/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_client_go_v2//util/async", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//http", "@com_github_tikv_pd_client//opt", diff --git a/pkg/store/driver/tikv_driver.go b/pkg/store/driver/tikv_driver.go index 86c2c0b28c..10188f4e3f 100644 --- a/pkg/store/driver/tikv_driver.go +++ b/pkg/store/driver/tikv_driver.go @@ -40,6 +40,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" + "github.com/tikv/client-go/v2/util/async" pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/opt" @@ -459,3 +460,17 @@ func (c *injectTraceClient) SendRequest(ctx context.Context, addr string, req *t } return c.Client.SendRequest(ctx, addr, req, timeout) } + +// SendRequestAsync sends Request asynchronously. +func (c *injectTraceClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) { + if info := tracing.TraceInfoFromContext(ctx); info != nil { + source := req.Context.SourceStmt + if source == nil { + source = &kvrpcpb.SourceStmt{} + req.Context.SourceStmt = source + } + source.ConnectionId = info.ConnectionID + source.SessionAlias = info.SessionAlias + } + c.Client.SendRequestAsync(ctx, addr, req, cb) +} diff --git a/pkg/store/mockstore/BUILD.bazel b/pkg/store/mockstore/BUILD.bazel index 8053fd7497..bf73f97c92 100644 --- a/pkg/store/mockstore/BUILD.bazel +++ b/pkg/store/mockstore/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_client_go_v2//util/async", "@com_github_tikv_pd_client//:client", ], ) diff --git a/pkg/store/mockstore/redirector.go b/pkg/store/mockstore/redirector.go index 3a44466ee9..efc404af6e 100644 --- a/pkg/store/mockstore/redirector.go +++ b/pkg/store/mockstore/redirector.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/async" ) type clientRedirector struct { @@ -72,6 +73,17 @@ func (c *clientRedirector) SendRequest(ctx context.Context, addr string, req *ti return c.mockClient.SendRequest(ctx, addr, req, timeout) } +func (c *clientRedirector) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) { + if req.StoreTp == tikvrpc.TiDB { + c.Once.Do(func() { + c.rpcClient = tikv.NewRPCClient(tikv.WithSecurity(config.GetGlobalConfig().Security.ClusterSecurity())) + }) + c.rpcClient.SendRequestAsync(ctx, addr, req, cb) + return + } + c.mockClient.SendRequestAsync(ctx, addr, req, cb) +} + func (c *clientRedirector) SetEventListener(listener tikv.ClientEventListener) { c.mockClient.SetEventListener(listener) } diff --git a/pkg/store/mockstore/unistore/BUILD.bazel b/pkg/store/mockstore/unistore/BUILD.bazel index e1492212dd..aeabda6f14 100644 --- a/pkg/store/mockstore/unistore/BUILD.bazel +++ b/pkg/store/mockstore/unistore/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", + "@com_github_tikv_client_go_v2//util/async", "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//clients/router", "@com_github_tikv_pd_client//clients/tso", diff --git a/pkg/store/mockstore/unistore/rpc.go b/pkg/store/mockstore/unistore/rpc.go index 688b9f407f..b12b958892 100644 --- a/pkg/store/mockstore/unistore/rpc.go +++ b/pkg/store/mockstore/unistore/rpc.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/util/codec" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" + "github.com/tikv/client-go/v2/util/async" "google.golang.org/grpc/metadata" ) @@ -60,6 +61,13 @@ var CheckResourceTagForTopSQLInGoTest bool // UnistoreRPCClientSendHook exports for test. var UnistoreRPCClientSendHook atomic.Pointer[func(*tikvrpc.Request)] +// SendRequestAsync sends a request to mock cluster asynchronously. +func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) { + go func() { + cb.Schedule(c.SendRequest(ctx, addr, req, tikv.ReadTimeoutMedium)) + }() +} + // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { tikvrpc.AttachContext(req, req.Context)