diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index 3b9cd8a703..c62dd677f9 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -16,6 +16,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" @@ -384,7 +385,7 @@ func setPDConfigCommand() *cobra.Command { return errors.Trace(err) } - mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false) + mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index f90743e1bd..4ccf9f6561 100755 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -47,6 +47,15 @@ const ( DefaultMergeRegionKeyCount uint64 = 960000 ) +type VersionCheckerType int + +const ( + // default version checker + NormalVersionChecker VersionCheckerType = iota + // version checker for PiTR + StreamVersionChecker +) + // Mgr manages connections to a TiDB cluster. type Mgr struct { *pdutil.PdController @@ -177,6 +186,7 @@ func NewMgr( storeBehavior StoreBehavior, checkRequirements bool, needDomain bool, + versionCheckerType VersionCheckerType, ) (*Mgr, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("conn.NewMgr", opentracing.ChildOf(span.Context())) @@ -192,7 +202,16 @@ func NewMgr( return nil, errors.Trace(err) } if checkRequirements { - err = version.CheckClusterVersion(ctx, controller.GetPDClient(), version.CheckVersionForBR) + var checker version.VerChecker + switch versionCheckerType { + case NormalVersionChecker: + checker = version.CheckVersionForBR + case StreamVersionChecker: + checker = version.CheckVersionForBRPiTR + default: + return nil, errors.Errorf("unknown command type, comman code is %d", versionCheckerType) + } + err = version.CheckClusterVersion(ctx, controller.GetPDClient(), checker) if err != nil { return nil, errors.Annotate(err, "running BR in incompatible version of cluster, "+ "if you believe it's OK, use --check-requirements=false to skip.") diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 311f57e9de..f100e4ea51 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/checksum" + "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" @@ -252,7 +253,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig // Domain loads all table info into memory. By skipping Domain, we save // lots of memory (about 500MB for 40K 40 fields YCSB tables). needDomain := !skipStats - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 7fe34dfb52..6bb5aba79c 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -11,6 +11,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/backup" + "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" @@ -137,7 +138,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf } // Backup raw does not need domain. needDomain := false - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index d9479764f0..d4cf94c1a0 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -566,6 +566,7 @@ func NewMgr(ctx context.Context, keepalive keepalive.ClientParameters, checkRequirements bool, needDomain bool, + versionCheckerType conn.VersionCheckerType, ) (*conn.Mgr, error) { var ( tlsConf *tls.Config @@ -590,7 +591,7 @@ func NewMgr(ctx context.Context, // Is it necessary to remove `StoreBehavior`? return conn.NewMgr( ctx, g, pdAddress, tlsConf, securityOption, keepalive, conn.SkipTiFlash, - checkRequirements, needDomain, + checkRequirements, needDomain, versionCheckerType, ) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 09fe19348e..798bd1420b 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -401,7 +401,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // Restore needs domain to do DDL. needDomain := true keepaliveCfg := GetKeepalive(&cfg.Config) - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, keepaliveCfg, cfg.CheckRequirements, needDomain, conn.NormalVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 452cccfad8..6c15cd9989 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -67,7 +67,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // Restore raw does not need domain. needDomain := false - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain, conn.NormalVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 5974c75f22..ac791a3404 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -288,7 +288,7 @@ type streamMgr struct { func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamStart bool) (*streamMgr, error) { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true) + cfg.CheckRequirements, true, conn.StreamVersionChecker) if err != nil { return nil, errors.Trace(err) } @@ -308,10 +308,6 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, isStreamS if err != nil { return nil, errors.Trace(err) } - if backend.GetS3() == nil { - return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, - "Only support s3 storage currently.") - } opts := storage.ExternalStorageOptions{ NoCredentials: cfg.NoCreds, @@ -767,7 +763,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre ctx, cancel := context.WithCancel(c) defer cancel() mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, false) + cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { return err } @@ -813,7 +809,7 @@ func makeStatusController(ctx context.Context, cfg *StreamConfig, g glue.Glue) ( printer = stream.PrintTaskWithJSON(console) } mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, false) + cfg.CheckRequirements, false, conn.StreamVersionChecker) if err != nil { return nil, err } @@ -1058,7 +1054,7 @@ func restoreStream( } mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true) + cfg.CheckRequirements, true, conn.StreamVersionChecker) if err != nil { return errors.Trace(err) } @@ -1534,7 +1530,7 @@ func buildPauseSafePointName(taskName string) string { func checkPiTRRequirements(ctx context.Context, g glue.Glue, cfg *RestoreConfig) error { mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), - cfg.CheckRequirements, true) + cfg.CheckRequirements, true, conn.StreamVersionChecker) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index 17d8bc6728..c790125fd2 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -129,6 +129,37 @@ func CheckVersionForBackup(backupVersion *semver.Version) VerChecker { } } +// CheckVersionForBRPiTR checks whether version of the cluster and BR-pitr itself is compatible. +// Note: BR'version >= 6.1.0 at least in this function +func CheckVersionForBRPiTR(s *metapb.Store, tikvVersion *semver.Version) error { + BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion)) + if err != nil { + return errors.Annotatef(berrors.ErrVersionMismatch, "%s: invalid version, please recompile using `git fetch origin --tags && make build`", err) + } + + // tikvVersion should at least 6.1.0 + if tikvVersion.Major < 6 || (tikvVersion.Major == 6 && tikvVersion.Minor == 0) { + return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s is too low when use PiTR, please update tikv's version to at least v6.1.0(v6.2.0+ recommanded)", + s.Address, tikvVersion) + } + + // The versions of BR and TiKV should be the same when use BR 6.1.0 + if BRVersion.Major == 6 && BRVersion.Minor == 1 { + if tikvVersion.Major != 6 || tikvVersion.Minor != 1 { + return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch when use PiTR v6.1.0, please use the same version of BR", + s.Address, tikvVersion, build.ReleaseVersion) + } + } else { + // If BRVersion > v6.1.0, the version of TiKV should be at least v6.2.0 + if tikvVersion.Major == 6 && tikvVersion.Minor <= 1 { + return errors.Annotatef(berrors.ErrVersionMismatch, "TiKV node %s version %s and BR %s version mismatch when use PiTR v6.2.0+, please use the tikv with version v6.2.0+", + s.Address, tikvVersion, build.ReleaseVersion) + } + } + + return nil +} + // CheckVersionForBR checks whether version of the cluster and BR itself is compatible. func CheckVersionForBR(s *metapb.Store, tikvVersion *semver.Version) error { BRVersion, err := semver.NewVersion(removeVAndHash(build.ReleaseVersion)) diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 84ffd74849..008e8a10d1 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -45,6 +45,74 @@ func TestCheckClusterVersion(t *testing.T) { Client: nil, } + { + build.ReleaseVersion = "v6.2.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v5.4.2`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error()) + } + + { + build.ReleaseVersion = "v6.2.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.0.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error()) + } + + { + build.ReleaseVersion = "v6.2.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.1.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* version mismatch when use PiTR v6.2.0\+, please `, err.Error()) + } + + { + build.ReleaseVersion = "v6.2.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.2.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.NoError(t, err) + } + + { + build.ReleaseVersion = "v6.1.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v5.4.2`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* is too low when use PiTR, please `, err.Error()) + } + + { + build.ReleaseVersion = "v6.1.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.1.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.NoError(t, err) + } + + { + build.ReleaseVersion = "v6.1.0" + mock.getAllStores = func() []*metapb.Store { + return []*metapb.Store{{Version: `v6.2.0`}} + } + err := CheckClusterVersion(context.Background(), &mock, CheckVersionForBRPiTR) + require.Error(t, err) + require.Regexp(t, `^TiKV .* version mismatch when use PiTR v6.1.0, please `, err.Error()) + } + { build.ReleaseVersion = "v4.0.5" mock.getAllStores = func() []*metapb.Store {