br/stream: Added Version Check for Loading Migrations. (#57541)
close pingcap/tidb#57540
This commit is contained in:
12
DEPS.bzl
12
DEPS.bzl
@ -5776,13 +5776,13 @@ def go_deps():
|
||||
name = "com_github_pingcap_kvproto",
|
||||
build_file_proto_mode = "disable_global",
|
||||
importpath = "github.com/pingcap/kvproto",
|
||||
sha256 = "d470ef683433f2c5bc7a1e610da44d516908d326a0341c07208af76a30f0d8a6",
|
||||
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241113043844-e1fa7ea8c302",
|
||||
sha256 = "7d3b6f6b755b027ba138d3069238f4a4e91d0d1f573de17cda00985616adc843",
|
||||
strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241120022153-92b0414aeed8",
|
||||
urls = [
|
||||
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
|
||||
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
|
||||
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
|
||||
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241113043844-e1fa7ea8c302.zip",
|
||||
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
|
||||
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
|
||||
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
|
||||
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120022153-92b0414aeed8.zip",
|
||||
],
|
||||
)
|
||||
go_repository(
|
||||
|
||||
@ -31,15 +31,16 @@ func IsContextCanceled(err error) bool {
|
||||
|
||||
// BR errors.
|
||||
var (
|
||||
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
|
||||
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
|
||||
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
|
||||
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
|
||||
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
|
||||
ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
|
||||
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
|
||||
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
|
||||
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))
|
||||
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
|
||||
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
|
||||
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
|
||||
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
|
||||
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
|
||||
ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
|
||||
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
|
||||
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))
|
||||
ErrInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Common:ErrInvalidRange"))
|
||||
ErrMigrationVersionNotSupported = errors.Normalize("the migration version isn't supported", errors.RFCCodeText("BR:Common:ErrMigrationVersionNotSupported"))
|
||||
|
||||
ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
|
||||
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
|
||||
|
||||
@ -34,6 +34,7 @@ go_library(
|
||||
"//pkg/util/codec",
|
||||
"//pkg/util/mathutil",
|
||||
"//pkg/util/table-filter",
|
||||
"//pkg/util/versioninfo",
|
||||
"@com_github_docker_go_units//:go-units",
|
||||
"@com_github_fatih_color//:color",
|
||||
"@com_github_klauspost_compress//zstd",
|
||||
@ -64,7 +65,7 @@ go_test(
|
||||
],
|
||||
embed = [":stream"],
|
||||
flaky = True,
|
||||
shard_count = 44,
|
||||
shard_count = 46,
|
||||
deps = [
|
||||
"//br/pkg/storage",
|
||||
"//br/pkg/streamhelper",
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/utils/iter"
|
||||
"github.com/pingcap/tidb/pkg/util"
|
||||
"github.com/pingcap/tidb/pkg/util/mathutil"
|
||||
"github.com/pingcap/tidb/pkg/util/versioninfo"
|
||||
"go.uber.org/multierr"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@ -38,8 +39,17 @@ const (
|
||||
baseTmp = "BASE_TMP"
|
||||
metaSuffix = ".meta"
|
||||
migrationPrefix = "v1/migrations"
|
||||
|
||||
SupportedMigVersion = pb.MigrationVersion_M1
|
||||
)
|
||||
|
||||
func NewMigration() *pb.Migration {
|
||||
return &pb.Migration{
|
||||
Version: pb.MigrationVersion_M1,
|
||||
Creator: fmt.Sprintf("br;commit=%s;branch=%s", versioninfo.TiDBGitHash, versioninfo.TiDBGitBranch),
|
||||
}
|
||||
}
|
||||
|
||||
type StreamMetadataSet struct {
|
||||
// if set true, the metadata and datafile won't be removed
|
||||
DryRun bool
|
||||
@ -196,7 +206,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(
|
||||
hst := ms.hook(st)
|
||||
est := MigerationExtension(hst)
|
||||
est.Hooks = updateFnHook{updateFn: updateFn}
|
||||
res := MigratedTo{NewBase: new(pb.Migration)}
|
||||
res := MigratedTo{NewBase: NewMigration()}
|
||||
est.doTruncateLogs(ctx, ms, from, &res)
|
||||
|
||||
if bst, ok := hst.ExternalStorage.(*storage.Batched); ok {
|
||||
@ -517,7 +527,7 @@ func MigerationExtension(s storage.ExternalStorage) MigrationExt {
|
||||
// Merge merges two migrations.
|
||||
// The merged migration contains all operations from the two arguments.
|
||||
func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration {
|
||||
out := new(pb.Migration)
|
||||
out := NewMigration()
|
||||
out.EditMeta = mergeMetaEdits(m1.GetEditMeta(), m2.GetEditMeta())
|
||||
out.Compactions = append(out.Compactions, m1.GetCompactions()...)
|
||||
out.Compactions = append(out.Compactions, m2.GetCompactions()...)
|
||||
@ -563,6 +573,24 @@ type OrderedMigration struct {
|
||||
Content pb.Migration `json:"content"`
|
||||
}
|
||||
|
||||
func (o *OrderedMigration) unmarshalContent(b []byte) error {
|
||||
err := o.Content.Unmarshal(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if o.Content.Version > SupportedMigVersion {
|
||||
return errors.Annotatef(
|
||||
berrors.ErrMigrationVersionNotSupported,
|
||||
"the migration at %s has version %s(%d), the max version we support is %s(%d)",
|
||||
o.Path,
|
||||
o.Content.Version, o.Content.Version,
|
||||
SupportedMigVersion, SupportedMigVersion,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load loads the current living migrations from the storage.
|
||||
func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
|
||||
opt := &storage.WalkOption{
|
||||
@ -575,6 +603,11 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = t.unmarshalContent(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.SeqNum == baseMigrationSN {
|
||||
// NOTE: the legacy truncating isn't implemented by appending a migration.
|
||||
// We load their checkpoint here to be compatible with them.
|
||||
@ -585,7 +618,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
|
||||
}
|
||||
t.Content.TruncatedTo = max(truncatedTs, t.Content.TruncatedTo)
|
||||
}
|
||||
return t.Content.Unmarshal(b)
|
||||
return nil
|
||||
})
|
||||
collected := iter.CollectAll(ctx, items)
|
||||
if collected.Err != nil {
|
||||
@ -605,7 +638,7 @@ func (m MigrationExt) Load(ctx context.Context) (Migrations, error) {
|
||||
// The BASE migration isn't persisted.
|
||||
// This happens when `migrate-to` wasn't run ever.
|
||||
result = Migrations{
|
||||
Base: new(pb.Migration),
|
||||
Base: NewMigration(),
|
||||
Layers: collected.Item,
|
||||
}
|
||||
}
|
||||
@ -818,7 +851,7 @@ func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...
|
||||
}
|
||||
|
||||
result := MigratedTo{
|
||||
NewBase: new(pb.Migration),
|
||||
NewBase: NewMigration(),
|
||||
}
|
||||
// Fills: EditMeta for new Base.
|
||||
m.doMetaEdits(ctx, mig, &result)
|
||||
|
||||
@ -732,6 +732,12 @@ func mTruncatedTo(to uint64) migOP {
|
||||
}
|
||||
}
|
||||
|
||||
func mVersion(ver backuppb.MigrationVersion) migOP {
|
||||
return func(m *backuppb.Migration) {
|
||||
m.Version = ver
|
||||
}
|
||||
}
|
||||
|
||||
// tmp creates a temporary storage.
|
||||
func tmp(t *testing.T) *storage.LocalStorage {
|
||||
tmpDir := t.TempDir()
|
||||
@ -2830,3 +2836,21 @@ func TestWithSimpleTruncate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnsupportedVersion(t *testing.T) {
|
||||
s := tmp(t)
|
||||
m := mig(mVersion(backuppb.MigrationVersion(65535)))
|
||||
pmig(s, 1, m)
|
||||
|
||||
est := MigerationExtension(s)
|
||||
ctx := context.Background()
|
||||
_, err := est.Load(ctx)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "ErrMigrationVersionNotSupported")
|
||||
}
|
||||
|
||||
func TestCreator(t *testing.T) {
|
||||
mig := NewMigration()
|
||||
require.Contains(t, mig.Creator, "br")
|
||||
require.Equal(t, mig.Version, SupportedMigVersion)
|
||||
}
|
||||
|
||||
@ -56,6 +56,11 @@ error = '''
|
||||
invalid restore range
|
||||
'''
|
||||
|
||||
["BR:Common:ErrMigrationVersionNotSupported"]
|
||||
error = '''
|
||||
the migration version isn't supported
|
||||
'''
|
||||
|
||||
["BR:Common:ErrUndefinedDbOrTable"]
|
||||
error = '''
|
||||
undefined restore databases or tables
|
||||
|
||||
2
go.mod
2
go.mod
@ -87,7 +87,7 @@ require (
|
||||
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
|
||||
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
|
||||
github.com/pingcap/fn v1.0.0
|
||||
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302
|
||||
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8
|
||||
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
|
||||
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
|
||||
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
|
||||
|
||||
4
go.sum
4
go.sum
@ -672,8 +672,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
|
||||
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
|
||||
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302 h1:ynwwqr0rLliSOJcx0wHMu4T/NiPXHlK48mk2DCrBKCI=
|
||||
github.com/pingcap/kvproto v0.0.0-20241113043844-e1fa7ea8c302/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
|
||||
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8 h1:aNNifhc6xCjXKejjiNYtJJLFNMXnoDiXxkJIg1JErQE=
|
||||
github.com/pingcap/kvproto v0.0.0-20241120022153-92b0414aeed8/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
|
||||
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
|
||||
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
|
||||
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=
|
||||
|
||||
Reference in New Issue
Block a user