log-backup: add version check for pitr (#36268)
close pingcap/tidb#36267
This commit is contained in:
@ -16,6 +16,7 @@ import (
|
||||
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
||||
"github.com/pingcap/kvproto/pkg/import_sstpb"
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/br/pkg/conn"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/logutil"
|
||||
"github.com/pingcap/tidb/br/pkg/metautil"
|
||||
@ -384,7 +385,7 @@ func setPDConfigCommand() *cobra.Command {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false)
|
||||
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -47,6 +47,15 @@ const (
|
||||
DefaultMergeRegionKeyCount uint64 = 960000
|
||||
)
|
||||
|
||||
type VersionCheckerType int
|
||||
|
||||
const (
|
||||
// default version checker
|
||||
NormalVersionChecker VersionCheckerType = iota
|
||||
// version checker for PiTR
|
||||
StreamVersionChecker
|
||||
)
|
||||
|
||||
// Mgr manages connections to a TiDB cluster.
|
||||
type Mgr struct {
|
||||
*pdutil.PdController
|
||||
@ -177,6 +186,7 @@ func NewMgr(
|
||||
storeBehavior StoreBehavior,
|
||||
checkRequirements bool,
|
||||
needDomain bool,
|
||||
versionCheckerType VersionCheckerType,
|
||||
) (*Mgr, error) {
|
||||
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
|
||||
span1 := span.Tracer().StartSpan("conn.NewMgr", opentracing.ChildOf(span.Context()))
|
||||
@ -192,7 +202,16 @@ func NewMgr(
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if checkRequirements {
|
||||
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR)
|
||||
var checker version.VerChecker
|
||||
switch versionCheckerType {
|
||||
case NormalVersionChecker:
|
||||
checker = version.CheckVersionForBR
|
||||
case StreamVersionChecker:
|
||||
checker = version.CheckVersionForBRPiTR
|
||||
default:
|
||||
return nil, errors.Errorf("unknown command type, comman code is %d", versionCheckerType)
|
||||
}
|
||||
err = version.CheckClusterVersion(ctx, controller.GetPDClient(), checker)
|
||||
if err != nil {
|
||||
return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+
|
||||
"if you believe it's OK, use --check-requirements=false to skip.")
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/br/pkg/backup"
|
||||
"github.com/pingcap/tidb/br/pkg/checksum"
|
||||
"github.com/pingcap/tidb/br/pkg/conn"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/glue"
|
||||
"github.com/pingcap/tidb/br/pkg/logutil"
|
||||
@ -252,7 +253,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
|
||||
// Domain loads all table info into memory. By skipping Domain, we save
|
||||
// lots of memory (about 500MB for 40K 40 fields YCSB tables).
|
||||
needDomain := !skipStats
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
||||
"github.com/pingcap/log"
|
||||
"github.com/pingcap/tidb/br/pkg/backup"
|
||||
"github.com/pingcap/tidb/br/pkg/conn"
|
||||
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
||||
"github.com/pingcap/tidb/br/pkg/glue"
|
||||
"github.com/pingcap/tidb/br/pkg/metautil"
|
||||
@ -137,7 +138,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
|
||||
}
|
||||
// Backup raw does not need domain.
|
||||
needDomain := false
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -566,6 +566,7 @@ func NewMgr(ctx context.Context,
|
||||
keepalive keepalive.ClientParameters,
|
||||
checkRequirements bool,
|
||||
needDomain bool,
|
||||
versionCheckerType conn.VersionCheckerType,
|
||||
) (*conn.Mgr, error) {
|
||||
var (
|
||||
tlsConf *tls.Config
|
||||
@ -590,7 +591,7 @@ func NewMgr(ctx context.Context,
|
||||
// Is it necessary to remove `StoreBehavior`?
|
||||
return conn.NewMgr(
|
||||
ctx, g, pdAddress, tlsConf, securityOption, keepalive, conn.SkipTiFlash,
|
||||
checkRequirements, needDomain,
|
||||
checkRequirements, needDomain, versionCheckerType,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@ -401,7 +401,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
|
||||
// Restore needs domain to do DDL.
|
||||
needDomain := true
|
||||
keepaliveCfg := GetKeepalive(&cfg.Config)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, needDomain)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
|
||||
|
||||
// Restore raw does not need domain.
|
||||
needDomain := false
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain)
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -288,7 +288,7 @@ type streamMgr struct {
|
||||
|
||||
func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) {
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, true)
|
||||
cfg.CheckRequirements, true, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
@ -308,10 +308,6 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if backend.GetS3() == nil {
|
||||
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig,
|
||||
"Only support s3 storage currently.")
|
||||
}
|
||||
|
||||
opts := storage.ExternalStorageOptions{
|
||||
NoCredentials: cfg.NoCreds,
|
||||
@ -767,7 +763,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
|
||||
ctx, cancel := context.WithCancel(c)
|
||||
defer cancel()
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, false)
|
||||
cfg.CheckRequirements, false, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -813,7 +809,7 @@ func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) (
|
||||
printer = stream.PrintTaskWithJSON(console)
|
||||
}
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, false)
|
||||
cfg.CheckRequirements, false, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1058,7 +1054,7 @@ func restoreStream(
|
||||
}
|
||||
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, true)
|
||||
cfg.CheckRequirements, true, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -1534,7 +1530,7 @@ func buildPauseSafePointName(taskName string) string {
|
||||
|
||||
func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) error {
|
||||
mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config),
|
||||
cfg.CheckRequirements, true)
|
||||
cfg.CheckRequirements, true, conn.StreamVersionChecker)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -129,6 +129,37 @@ func CheckVersionForBackup(backupVersion *semver.Version) VerChecker {
|
||||
}
|
||||
}
|
||||
|
||||
// CheckVersionForBRPiTR checks whether version of the cluster and BR-pitr itself is compatible.
|
||||
// Note: BR'version >= 6.1.0 at least in this function
|
||||
func CheckVersionForBRPiTR(s *metapb.Store, tikvVersion *semver.Version) error {
|
||||
BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion))
|
||||
if err != nil {
|
||||
return errors.Annotatef(berrors.ErrVersionMismatch, "%s: invalid version, please recompile using `git fetch origin --tags && make build`", err)
|
||||
}
|
||||
|
||||
// tikvVersion should at least 6.1.0
|
||||
if tikvVersion.Major < 6 || (tikvVersion.Major == 6 && tikvVersion.Minor == 0) {
|
||||
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s is too low when use PiTR, please update tikv's version to at least v6.1.0(v6.2.0+ recommanded)",
|
||||
s.Address, tikvVersion)
|
||||
}
|
||||
|
||||
// The versions of BR and TiKV should be the same when use BR 6.1.0
|
||||
if BRVersion.Major == 6 && BRVersion.Minor == 1 {
|
||||
if tikvVersion.Major != 6 || tikvVersion.Minor != 1 {
|
||||
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch when use PiTR v6.1.0, please use the same version of BR",
|
||||
s.Address, tikvVersion, build.ReleaseVersion)
|
||||
}
|
||||
} else {
|
||||
// If BRVersion > v6.1.0, the version of TiKV should be at least v6.2.0
|
||||
if tikvVersion.Major == 6 && tikvVersion.Minor <= 1 {
|
||||
return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch when use PiTR v6.2.0+, please use the tikv with version v6.2.0+",
|
||||
s.Address, tikvVersion, build.ReleaseVersion)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckVersionForBR checks whether version of the cluster and BR itself is compatible.
|
||||
func CheckVersionForBR(s *metapb.Store, tikvVersion *semver.Version) error {
|
||||
BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion))
|
||||
|
||||
@ -45,6 +45,74 @@ func TestCheckClusterVersion(t *testing.T) {
|
||||
Client: nil,
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.2.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v5.4.2`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error())
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.2.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v6.0.0`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error())
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.2.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v6.1.0`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, `^TiKV .* version mismatch when use PiTR v6.2.0\+, please `, err.Error())
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.2.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v6.2.0`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.1.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v5.4.2`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error())
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.1.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v6.1.0`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v6.1.0"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
return []*metapb.Store{{Version: `v6.2.0`}}
|
||||
}
|
||||
err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR)
|
||||
require.Error(t, err)
|
||||
require.Regexp(t, `^TiKV .* version mismatch when use PiTR v6.1.0, please `, err.Error())
|
||||
}
|
||||
|
||||
{
|
||||
build.ReleaseVersion = "v4.0.5"
|
||||
mock.getAllStores = func() []*metapb.Store {
|
||||
|
||||
Reference in New Issue
Block a user