Files
tidb/config/config_handler.go

224 lines
6.6 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package config
import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/configpb"
"github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
// ConfHandler is used to load and update config online.
// See https://github.com/pingcap/tidb/pull/13660 for more details.
type ConfHandler interface {
Start()
Close()
GetConfig() *Config // read only
}
// ConfReloadFunc is used to reload the config to make it work.
type ConfReloadFunc func(oldConf, newConf *Config)
// NewConfHandler creates a new ConfHandler according to the local config.
func NewConfHandler(localConf *Config, reloadFunc ConfReloadFunc) (ConfHandler, error) {
switch defaultConf.Store {
case "tikv":
return newPDConfHandler(localConf, reloadFunc, nil)
default:
return &constantConfHandler{localConf}, nil
}
}
// constantConfHandler is used in local or debug environment.
// It always returns the constant config initialized at the beginning.
type constantConfHandler struct {
conf *Config
}
func (cch *constantConfHandler) Start() {}
func (cch *constantConfHandler) Close() {}
func (cch *constantConfHandler) GetConfig() *Config { return cch.conf }
const (
pdConfHandlerRefreshInterval = 30 * time.Second
tidbComponentName = "tidb"
)
type pdConfHandler struct {
id string // ip:port
version *configpb.Version
curConf atomic.Value
interval time.Duration
wg sync.WaitGroup
exit chan struct{}
pdConfCli pd.ConfigClient
reloadFunc func(oldConf, newConf *Config)
}
func newPDConfHandler(localConf *Config, reloadFunc ConfReloadFunc,
newPDCliFunc func([]string, pd.SecurityOption) (pd.ConfigClient, error), // for test
) (*pdConfHandler, error) {
addresses, _, err := ParsePath(localConf.Path)
if err != nil {
return nil, err
}
host := localConf.Host
if localConf.AdvertiseAddress != "" {
host = localConf.AdvertiseAddress
}
id := fmt.Sprintf("%v:%v", host, localConf.Port)
security := localConf.Security
if newPDCliFunc == nil {
newPDCliFunc = pd.NewConfigClient
}
pdCli, err := newPDCliFunc(addresses, pd.SecurityOption{
CAPath: security.ClusterSSLCA,
CertPath: security.ClusterSSLCert,
KeyPath: security.ClusterSSLKey,
})
if err != nil {
return nil, err
}
confContent, err := encodeConfig(localConf)
if err != nil {
return nil, err
}
// register to PD and get the new default config.
// see https://github.com/pingcap/tidb/pull/13660 for more details.
// suppose port and security config items cannot be change online.
status, version, conf, err := pdCli.Create(context.Background(), new(configpb.Version), tidbComponentName, id, confContent)
if err != nil {
logutil.Logger(context.Background()).Warn("register the config to PD error, local config will be used", zap.Error(err))
} else if status.Code != configpb.StatusCode_OK && status.Code != configpb.StatusCode_WRONG_VERSION {
logutil.Logger(context.Background()).Warn("invalid status when registering the config to PD", zap.String("code", status.Code.String()), zap.String("errmsg", status.Message))
conf = ""
}
tmpConf := *localConf // use the local config if the remote config is invalid
newConf := &tmpConf
if conf != "" {
newConf, err = decodeConfig(conf)
if err != nil {
logutil.Logger(context.Background()).Warn("decode remote config error", zap.Error(err))
newConf = &tmpConf
} else if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Warn("invalid remote config", zap.Error(err))
newConf = &tmpConf
}
}
ch := &pdConfHandler{
id: id,
version: version,
interval: pdConfHandlerRefreshInterval,
exit: make(chan struct{}),
pdConfCli: pdCli,
reloadFunc: reloadFunc,
}
ch.curConf.Store(newConf)
return ch, nil
}
func (ch *pdConfHandler) Start() {
ch.wg.Add(1)
go ch.run()
}
func (ch *pdConfHandler) Close() {
close(ch.exit)
ch.wg.Wait()
ch.pdConfCli.Close()
}
func (ch *pdConfHandler) GetConfig() *Config {
return ch.curConf.Load().(*Config)
}
func (ch *pdConfHandler) run() {
defer func() {
if r := recover(); r != nil {
logutil.Logger(context.Background()).Error("panic in the recoverable goroutine",
zap.Reflect("r", r),
zap.Stack("stack trace"))
}
ch.wg.Done()
}()
for {
select {
case <-time.After(ch.interval):
// fetch new config from PD
status, version, newConfContent, err := ch.pdConfCli.Get(context.Background(), ch.version, tidbComponentName, ch.id)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config error", zap.Error(err))
continue
}
if status.Code == configpb.StatusCode_OK { // StatusCode_OK represents the request is successful and there is no change.
continue
}
if status.Code != configpb.StatusCode_WRONG_VERSION {
// StatusCode_WRONG_VERSION represents the request is successful and the config has been updated.
logutil.Logger(context.Background()).Error("PDConfHandler fetch new config PD error",
zap.Int("code", int(status.Code)), zap.String("message", status.Message))
continue
}
newConf, err := decodeConfig(newConfContent)
if err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler decode config error", zap.Error(err))
continue
}
if err := newConf.Valid(); err != nil {
logutil.Logger(context.Background()).Error("PDConfHandler invalid config", zap.Error(err))
continue
}
ch.reloadFunc(ch.curConf.Load().(*Config), newConf)
ch.curConf.Store(newConf)
logutil.Logger(context.Background()).Info("PDConfHandler update config successfully",
zap.String("fromVersion", ch.version.String()), zap.String("toVersion", version.String()))
ch.version = version
case <-ch.exit:
return
}
}
}
func encodeConfig(conf *Config) (string, error) {
confBuf := bytes.NewBuffer(nil)
te := toml.NewEncoder(confBuf)
if err := te.Encode(conf); err != nil {
return "", errors.New("encode config error=" + err.Error())
}
return confBuf.String(), nil
}
func decodeConfig(content string) (*Config, error) {
c := new(Config)
_, err := toml.Decode(content, c)
return c, err
}