*: bump client-go to enable async-batch-get (#62294)

close pingcap/tidb#62293
This commit is contained in:
zyguan
2025-08-01 14:56:16 +08:00
committed by GitHub
parent d4c3c72144
commit d6153b2cc8
13 changed files with 75 additions and 9 deletions

View File

@ -7272,13 +7272,13 @@ def go_deps():
build_tags = ["nextgen"],
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "a911629d5e7c8421e2a69d9f47eb1ad6cb286ed0c13a5a8d201291d4b1739fd9",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20250707065624-97382455050a",
sha256 = "ae7575a9055def212e39121dc880c534f5ffd6a9a50057cf36cc754419ce816a",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20250708031306-557a4986e4c4",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250707065624-97382455050a.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250708031306-557a4986e4c4.zip",
],
)
go_repository(

2
go.mod
View File

@ -112,7 +112,7 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tdakkota/asciicheck v0.4.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a
github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4
github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67
github.com/twmb/murmur3 v1.1.6

4
go.sum
View File

@ -850,8 +850,8 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a h1:5naxLKnWMWNStw2YF5gt0vvfwOTDh1tmsE+tuLNOzCU=
github.com/tikv/client-go/v2 v2.0.8-0.20250707065624-97382455050a/go.mod h1:SKB18UM0Wi6PTGydLwJuKPjZ3cBtRI/yMcpdt24SSI4=
github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4 h1:EWm+6p9vA9fizcaHzt813nHRMqMnOAoPe1Ap7SEz9Lk=
github.com/tikv/client-go/v2 v2.0.8-0.20250708031306-557a4986e4c4/go.mod h1:SKB18UM0Wi6PTGydLwJuKPjZ3cBtRI/yMcpdt24SSI4=
github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500 h1:ZUCeeEEU76I/jFUK3C9EV3dSSrtVZt3rwpHF6cT8V3k=
github.com/tikv/pd/client v0.0.0-20250703091733-dfd345b89500/go.mod h1:SicyvcZE0fzrGGWW3AEtZWWPRzGw/h5img4/6qiSYws=
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 h1:9LPGD+jzxMlnk5r6+hJnar67cgpDIz/iyD+rfl5r2Vk=

View File

@ -342,6 +342,7 @@ func (c *Config) GetTiKVConfig() *tikvcfg.Config {
EnableForwarding: c.EnableForwarding,
TxnScope: c.Labels["zone"],
ZoneLabel: c.Labels["zone"],
EnableAsyncBatchGet: c.Performance.EnableAsyncBatchGet,
}
}
@ -749,6 +750,9 @@ type Performance struct {
// Deprecated: this config will not have any effect
ProjectionPushDown bool `toml:"projection-push-down" json:"projection-push-down"`
// EnableAsyncBatchGet indicates whether to use async API when sending batch-get requests.
EnableAsyncBatchGet bool `toml:"enable-async-batch-get" json:"enable-async-batch-get"`
}
// PlanCache is the PlanCache section of the config.
@ -1010,6 +1014,7 @@ var defaultConf = Config{
ForceInitStats: true,
// Deprecated: Stats are always initialized concurrently.
ConcurrentlyInitStats: true,
EnableAsyncBatchGet: true,
},
ProxyProtocol: ProxyProtocol{
Networks: "",

View File

@ -62,6 +62,7 @@ go_library(
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_client_go_v2//util/async",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//opt",
"@com_github_twmb_murmur3//:murmur3",
@ -106,6 +107,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util/async",
"@org_uber_go_goleak//:goleak",
"@org_uber_go_zap//:zap",
],

View File

@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
)
const (
@ -59,6 +60,17 @@ func (t *mockDetectClient) SendRequest(
return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{Available: true}}, nil
}
func (t *mockDetectClient) SendRequestAsync(
ctx context.Context,
addr string,
req *tikvrpc.Request,
cb async.Callback[*tikvrpc.Response],
) {
go func() {
cb.Schedule(t.SendRequest(ctx, addr, req, tikv.ReadTimeoutMedium))
}()
}
func (t *mockDetectClient) SetEventListener(_ tikv.ClientEventListener) {}
type ProbeTest map[string]*mockDetectClient

View File

@ -29,6 +29,7 @@ import (
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
)
type kvStore struct {
@ -73,6 +74,14 @@ func (c *tikvClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.
return res, derr.ToTiDBErr(err)
}
// SendRequestAsync sends Request asynchronously.
func (c *tikvClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
cb.Inject(func(res *tikvrpc.Response, err error) (*tikvrpc.Response, error) {
return res, derr.ToTiDBErr(err)
})
c.c.SendRequestAsync(ctx, addr, req, cb)
}
func (c *tikvClient) SetEventListener(listener tikv.ClientEventListener) {
c.c.SetEventListener(listener)
}

View File

@ -22,6 +22,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_client_go_v2//util/async",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//opt",

View File

@ -40,6 +40,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"github.com/tikv/client-go/v2/util/async"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/opt"
@ -459,3 +460,17 @@ func (c *injectTraceClient) SendRequest(ctx context.Context, addr string, req *t
}
return c.Client.SendRequest(ctx, addr, req, timeout)
}
// SendRequestAsync sends Request asynchronously.
func (c *injectTraceClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
if info := tracing.TraceInfoFromContext(ctx); info != nil {
source := req.Context.SourceStmt
if source == nil {
source = &kvrpcpb.SourceStmt{}
req.Context.SourceStmt = source
}
source.ConnectionId = info.ConnectionID
source.SessionAlias = info.SessionAlias
}
c.Client.SendRequestAsync(ctx, addr, req, cb)
}

View File

@ -27,6 +27,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_client_go_v2//util/async",
"@com_github_tikv_pd_client//:client",
],
)

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
)
type clientRedirector struct {
@ -72,6 +73,17 @@ func (c *clientRedirector) SendRequest(ctx context.Context, addr string, req *ti
return c.mockClient.SendRequest(ctx, addr, req, timeout)
}
func (c *clientRedirector) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
if req.StoreTp == tikvrpc.TiDB {
c.Once.Do(func() {
c.rpcClient = tikv.NewRPCClient(tikv.WithSecurity(config.GetGlobalConfig().Security.ClusterSecurity()))
})
c.rpcClient.SendRequestAsync(ctx, addr, req, cb)
return
}
c.mockClient.SendRequestAsync(ctx, addr, req, cb)
}
func (c *clientRedirector) SetEventListener(listener tikv.ClientEventListener) {
c.mockClient.SetEventListener(listener)
}

View File

@ -40,6 +40,7 @@ go_library(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util/async",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/router",
"@com_github_tikv_pd_client//clients/tso",

View File

@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util/async"
"google.golang.org/grpc/metadata"
)
@ -60,6 +61,13 @@ var CheckResourceTagForTopSQLInGoTest bool
// UnistoreRPCClientSendHook exports for test.
var UnistoreRPCClientSendHook atomic.Pointer[func(*tikvrpc.Request)]
// SendRequestAsync sends a request to mock cluster asynchronously.
func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikvrpc.Request, cb async.Callback[*tikvrpc.Response]) {
go func() {
cb.Schedule(c.SendRequest(ctx, addr, req, tikv.ReadTimeoutMedium))
}()
}
// SendRequest sends a request to mock cluster.
func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
tikvrpc.AttachContext(req, req.Context)