From 91c14a45bb01f93088cf39017ab0a952fea58ac4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 21 Nov 2024 10:00:38 +0800 Subject: [PATCH] br/stream: Added Version Check for `Load`ing Migrations. (#57541) close pingcap/tidb#57540 --- DEPS.bzl | 12 ++++----- br/pkg/errors/errors.go | 19 ++++++------- br/pkg/stream/BUILD.bazel | 3 ++- br/pkg/stream/stream_metas.go | 43 ++++++++++++++++++++++++++---- br/pkg/stream/stream_metas_test.go | 24 +++++++++++++++++ errors.toml | 5 ++++ go.mod | 2 +- go.sum | 4 +-- 8 files changed, 88 insertions(+), 24 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index e0e72e6ce6..4535365109 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5776,13 +5776,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "d470ef683433f2c5bc7a1e610da44d516908d326a0341c07208af76a30f0d8a6", - strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241113043844-e1fa7ea8c302", + sha256 = "7d3b6f6b755b027ba138d3069238f4a4e91d0d1f573de17cda00985616adc843", + strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241120022153-92b0414aeed8", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip", ], ) go_repository( diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 3bd2ab776c..6a9449eff9 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -31,15 +31,16 @@ func IsContextCanceled(err error) bool { // BR errors. var ( - ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown")) - ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument")) - ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable")) - ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch")) - ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect")) - ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile")) - ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified")) - ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation")) - ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange")) + ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown")) + ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument")) + ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable")) + ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch")) + ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect")) + ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile")) + ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified")) + ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation")) + ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange")) + ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported")) ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed")) ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound")) diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index e72ef472d2..e537b651c3 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/util/codec", "//pkg/util/mathutil", "//pkg/util/table-filter", + "//pkg/util/versioninfo", "@com_github_docker_go_units//:go-units", "@com_github_fatih_color//:color", "@com_github_klauspost_compress//zstd", @@ -64,7 +65,7 @@ go_test( ], embed = [":stream"], flaky = True, - shard_count = 44, + shard_count = 46, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", diff --git a/br/pkg/stream/stream_metas.go b/br/pkg/stream/stream_metas.go index 14c65da097..bfbebafebd 100644 --- a/br/pkg/stream/stream_metas.go +++ b/br/pkg/stream/stream_metas.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/mathutil" + "github.com/pingcap/tidb/pkg/util/versioninfo" "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -38,8 +39,17 @@ const ( baseTmp = "BASE_TMP" metaSuffix = ".meta" migrationPrefix = "v1/migrations" + + SupportedMigVersion = pb.MigrationVersion_M1 ) +func NewMigration() *pb.Migration { + return &pb.Migration{ + Version: pb.MigrationVersion_M1, + Creator: fmt.Sprintf("br;commit=%s;branch=%s", versioninfo.TiDBGitHash, versioninfo.TiDBGitBranch), + } +} + type StreamMetadataSet struct { // if set true, the metadata and datafile won't be removed DryRun bool @@ -196,7 +206,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch( hst := ms.hook(st) est := MigerationExtension(hst) est.Hooks = updateFnHook{updateFn: updateFn} - res := MigratedTo{NewBase: new(pb.Migration)} + res := MigratedTo{NewBase: NewMigration()} est.doTruncateLogs(ctx, ms, from, &res) if bst, ok := hst.ExternalStorage.(*storage.Batched); ok { @@ -517,7 +527,7 @@ func MigerationExtension(s storage.ExternalStorage) MigrationExt { // Merge merges two migrations. // The merged migration contains all operations from the two arguments. func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration { - out := new(pb.Migration) + out := NewMigration() out.EditMeta = mergeMetaEdits(m1.GetEditMeta(), m2.GetEditMeta()) out.Compactions = append(out.Compactions, m1.GetCompactions()...) out.Compactions = append(out.Compactions, m2.GetCompactions()...) @@ -563,6 +573,24 @@ type OrderedMigration struct { Content pb.Migration `json:"content"` } +func (o *OrderedMigration) unmarshalContent(b []byte) error { + err := o.Content.Unmarshal(b) + if err != nil { + return err + } + if o.Content.Version > SupportedMigVersion { + return errors.Annotatef( + berrors.ErrMigrationVersionNotSupported, + "the migration at %s has version %s(%d), the max version we support is %s(%d)", + o.Path, + o.Content.Version, o.Content.Version, + SupportedMigVersion, SupportedMigVersion, + ) + } + + return nil +} + // Load loads the current living migrations from the storage. func (m MigrationExt) Load(ctx context.Context) (Migrations, error) { opt := &storage.WalkOption{ @@ -575,6 +603,11 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) { if err != nil { return errors.Trace(err) } + err = t.unmarshalContent(b) + if err != nil { + return err + } + if t.SeqNum == baseMigrationSN { // NOTE: the legacy truncating isn't implemented by appending a migration. // We load their checkpoint here to be compatible with them. @@ -585,7 +618,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) { } t.Content.TruncatedTo = max(truncatedTs, t.Content.TruncatedTo) } - return t.Content.Unmarshal(b) + return nil }) collected := iter.CollectAll(ctx, items) if collected.Err != nil { @@ -605,7 +638,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) { // The BASE migration isn't persisted. // This happens when `migrate-to` wasn't run ever. result = Migrations{ - Base: new(pb.Migration), + Base: NewMigration(), Layers: collected.Item, } } @@ -818,7 +851,7 @@ func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ... } result := MigratedTo{ - NewBase: new(pb.Migration), + NewBase: NewMigration(), } // Fills: EditMeta for new Base. m.doMetaEdits(ctx, mig, &result) diff --git a/br/pkg/stream/stream_metas_test.go b/br/pkg/stream/stream_metas_test.go index c6055459a2..b292a3f8a9 100644 --- a/br/pkg/stream/stream_metas_test.go +++ b/br/pkg/stream/stream_metas_test.go @@ -732,6 +732,12 @@ func mTruncatedTo(to uint64) migOP { } } +func mVersion(ver backuppb.MigrationVersion) migOP { + return func(m *backuppb.Migration) { + m.Version = ver + } +} + // tmp creates a temporary storage. func tmp(t *testing.T) *storage.LocalStorage { tmpDir := t.TempDir() @@ -2830,3 +2836,21 @@ func TestWithSimpleTruncate(t *testing.T) { } } } + +func TestUnsupportedVersion(t *testing.T) { + s := tmp(t) + m := mig(mVersion(backuppb.MigrationVersion(65535))) + pmig(s, 1, m) + + est := MigerationExtension(s) + ctx := context.Background() + _, err := est.Load(ctx) + require.Error(t, err) + require.ErrorContains(t, err, "ErrMigrationVersionNotSupported") +} + +func TestCreator(t *testing.T) { + mig := NewMigration() + require.Contains(t, mig.Creator, "br") + require.Equal(t, mig.Version, SupportedMigVersion) +} diff --git a/errors.toml b/errors.toml index 1f9329951d..554ba7cb6f 100644 --- a/errors.toml +++ b/errors.toml @@ -56,6 +56,11 @@ error = ''' invalid restore range ''' +["BR:Common:ErrMigrationVersionNotSupported"] +error = ''' +the migration version isn't supported +''' + ["BR:Common:ErrUndefinedDbOrTable"] error = ''' undefined restore databases or tables diff --git a/go.mod b/go.mod index d9122f2beb..63c38192e3 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 + github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index 7f4487adaa..1e03c2ee88 100644 --- a/go.sum +++ b/go.sum @@ -672,8 +672,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 h1:ynwwqr0rLliSOJcx0wHMu4T/NiPXHlK48mk2DCrBKCI= -github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 h1:aNNifhc6xCjXKejjiNYtJJLFNMXnoDiXxkJIg1JErQE= +github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=