sessionctx/binloginfo: add a timeout for writing binlog (#6587)
This commit is contained in:
@ -229,6 +229,7 @@ type TiKVClient struct {
|
||||
// Binlog is the config for binlog.
|
||||
type Binlog struct {
|
||||
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
|
||||
WriteTimeout string `toml:"write-timeout" json:"write-timeout"`
|
||||
// If IgnoreError is true, when writting binlog meets error, TiDB would
|
||||
// ignore the error.
|
||||
IgnoreError bool `toml:"ignore-error" json:"ignore-error"`
|
||||
@ -305,6 +306,9 @@ var defaultConf = Config{
|
||||
GrpcConnectionCount: 16,
|
||||
CommitTimeout: "41s",
|
||||
},
|
||||
Binlog: Binlog{
|
||||
WriteTimeout: "15s",
|
||||
},
|
||||
}
|
||||
|
||||
var globalConf = defaultConf
|
||||
|
||||
@ -223,6 +223,9 @@ capacity = 1024000
|
||||
# Socket file to write binlog.
|
||||
binlog-socket = ""
|
||||
|
||||
# WriteTimeout specifies how long it will wait for writing binlog to pump.
|
||||
write-timeout = "15s"
|
||||
|
||||
# If IgnoreError is true, when writting binlog meets error, TiDB would stop writting binlog,
|
||||
# but still provide service.
|
||||
ignore-error = false
|
||||
|
||||
@ -35,6 +35,8 @@ func init() {
|
||||
grpc.EnableTracing = false
|
||||
}
|
||||
|
||||
var binlogWriteTimeout = 15 * time.Second
|
||||
|
||||
// pumpClient is the gRPC client to write binlog, it is opened on server start and never close,
|
||||
// shared by all sessions.
|
||||
var pumpClient binlog.PumpClient
|
||||
@ -61,6 +63,15 @@ func SetPumpClient(client binlog.PumpClient) {
|
||||
pumpClientLock.Unlock()
|
||||
}
|
||||
|
||||
// SetGRPCTimeout sets grpc timeout for writing binlog.
|
||||
func SetGRPCTimeout(timeout time.Duration) {
|
||||
if timeout < 300*time.Millisecond {
|
||||
log.Warnf("set binlog grpc timeout %s ignored, use default value %s", timeout, binlogWriteTimeout)
|
||||
return // Avoid invalid value
|
||||
}
|
||||
binlogWriteTimeout = timeout
|
||||
}
|
||||
|
||||
// GetPrewriteValue gets binlog prewrite value in the context.
|
||||
func GetPrewriteValue(ctx sessionctx.Context, createIfNotExists bool) *binlog.PrewriteValue {
|
||||
vars := ctx.GetSessionVars()
|
||||
@ -109,7 +120,9 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
|
||||
// Retry many times because we may raise CRITICAL error here.
|
||||
for i := 0; i < 20; i++ {
|
||||
var resp *binlog.WriteBinlogResp
|
||||
resp, err = info.Client.WriteBinlog(context.Background(), req)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), binlogWriteTimeout)
|
||||
resp, err = info.Client.WriteBinlog(ctx, req)
|
||||
cancel()
|
||||
if err == nil && resp.Errmsg != "" {
|
||||
err = errors.New(resp.Errmsg)
|
||||
}
|
||||
|
||||
@ -176,6 +176,7 @@ func setupBinlogClient() {
|
||||
if cfg.Binlog.IgnoreError {
|
||||
binloginfo.SetIgnoreError(true)
|
||||
}
|
||||
binloginfo.SetGRPCTimeout(parseDuration(cfg.Binlog.WriteTimeout))
|
||||
binloginfo.SetPumpClient(binlog.NewPumpClient(clientConn))
|
||||
log.Infof("created binlog client at %s, ignore error %v", cfg.Binlog.BinlogSocket, cfg.Binlog.IgnoreError)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user