From c3feea5900724b14fafda024ab98cf39446981d7 Mon Sep 17 00:00:00 2001 From: Daniel Moran Date: Fri, 9 Jul 2021 15:36:44 -0400 Subject: [PATCH] feat: update `restore` to support InfluxDB 2.0.x (#185) --- api/api_restore.gen.go | 2 +- api/contract/cli.yml | 2 +- api/gunzip.go | 26 +- clients/backup/backup.go | 38 +-- clients/backup/backup_internal_test.go | 58 ----- clients/backup/bolt.go | 12 +- clients/backup/manifest.go | 4 +- clients/backup/manifest_test.go | 4 +- clients/restore/restore.go | 233 ++++++++++++++++-- cmd/influx/restore.go | 2 + .../backup_restore}/README.md | 2 +- internal/backup_restore/api_check.go | 45 ++++ internal/backup_restore/api_check_test.go | 69 ++++++ internal/backup_restore/manifest.go | 6 +- .../backup_restore}/meta.pb.go | 8 +- .../backup_restore}/meta.proto | 0 pkg/gzip/gunzip.go | 30 +++ 17 files changed, 380 insertions(+), 161 deletions(-) rename {clients/backup/internal => internal/backup_restore}/README.md (92%) create mode 100644 internal/backup_restore/api_check.go create mode 100644 internal/backup_restore/api_check_test.go rename {clients/backup/internal => internal/backup_restore}/meta.pb.go (99%) rename {clients/backup/internal => internal/backup_restore}/meta.proto (100%) create mode 100644 pkg/gzip/gunzip.go diff --git a/api/api_restore.gen.go b/api/api_restore.gen.go index 0fb644c..5e1a02a 100644 --- a/api/api_restore.gen.go +++ b/api/api_restore.gen.go @@ -193,7 +193,7 @@ func (a *RestoreApiService) PostRestoreBucketIDExecute(r ApiPostRestoreBucketIDR return localVarReturnValue, GenericOpenAPIError{error: err.Error()} } - localVarPath := localBasePath + "/restore/bucket/{bucketID}" + localVarPath := localBasePath + "/restore/buckets/{bucketID}" localVarPath = strings.Replace(localVarPath, "{"+"bucketID"+"}", _neturl.PathEscape(parameterToString(r.bucketID, "")), -1) localVarHeaderParams := make(map[string]string) diff --git a/api/contract/cli.yml b/api/contract/cli.yml index 83338f0..c9a52ac 100644 --- a/api/contract/cli.yml +++ b/api/contract/cli.yml @@ -67,7 +67,7 @@ paths: $ref: "./openapi/src/oss/paths/restore_kv.yml" /restore/sql: $ref: "./openapi/src/oss/paths/restore_sql.yml" - /restore/bucket/{bucketID}: + /restore/buckets/{bucketID}: $ref: "./openapi/src/oss/paths/restore_bucket_bucketID.yml" /restore/bucket-metadata: $ref: "./openapi/src/oss/paths/restore_bucket-metadata.yml" diff --git a/api/gunzip.go b/api/gunzip.go index aaa5e93..5c2af6e 100644 --- a/api/gunzip.go +++ b/api/gunzip.go @@ -1,34 +1,20 @@ package api import ( - "compress/gzip" "io" "net/http" + + "github.com/influxdata/influx-cli/v2/pkg/gzip" ) func GunzipIfNeeded(resp *http.Response) (io.ReadCloser, error) { if resp.Header.Get("Content-Encoding") == "gzip" { - gzr, err := gzip.NewReader(resp.Body) + reader, err := gzip.NewGunzipReadCloser(resp.Body) if err != nil { - return resp.Body, err + resp.Body.Close() + return nil, err } - return &gunzipReadCloser{underlying: resp.Body, gunzip: gzr}, nil + return reader, nil } return resp.Body, nil } - -type gunzipReadCloser struct { - underlying io.ReadCloser - gunzip io.ReadCloser -} - -func (gzrc *gunzipReadCloser) Read(p []byte) (int, error) { - return gzrc.gunzip.Read(p) -} - -func (gzrc *gunzipReadCloser) Close() error { - if err := gzrc.gunzip.Close(); err != nil { - return err - } - return gzrc.underlying.Close() -} diff --git a/clients/backup/backup.go b/clients/backup/backup.go index 40edc5c..d38a8d6 100644 --- a/clients/backup/backup.go +++ b/clients/backup/backup.go @@ -11,8 +11,6 @@ import ( "mime/multipart" "os" "path/filepath" - "regexp" - "strconv" "time" "github.com/influxdata/influx-cli/v2/api" @@ -74,7 +72,7 @@ func (c *Client) Backup(ctx context.Context, params *Params) error { c.baseName = time.Now().UTC().Format(backupFilenamePattern) // The APIs we use to back up metadata depends on the server's version. - legacyServer, err := c.serverIsLegacy(ctx) + legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi) if err != nil { return err } @@ -96,40 +94,6 @@ func (c *Client) Backup(ctx context.Context, params *Params) error { return nil } -var semverRegex = regexp.MustCompile(`(\d+)\.(\d+)\.(\d+).*`) - -// serverIsLegacy checks if the InfluxDB server targeted by the backup is running v2.0.x, -// which used different APIs for backups. -func (c Client) serverIsLegacy(ctx context.Context) (bool, error) { - res, err := c.GetHealth(ctx).Execute() - if err != nil { - return false, fmt.Errorf("API compatibility check failed: %w", err) - } - var version string - if res.Version != nil { - version = *res.Version - } - - matches := semverRegex.FindSubmatch([]byte(version)) - if matches == nil { - // Assume non-semver versions are only reported by nightlies & dev builds, which - // should now support the new APIs. - log.Printf("WARN: Couldn't parse version %q reported by server, assuming latest backup APIs are supported", version) - return false, nil - } - // matches[0] is the entire matched string, capture groups start at 1. - majorStr, minorStr := matches[1], matches[2] - // Ignore the err values here because the regex-match ensures we can parse the captured - // groups as integers. - major, _ := strconv.Atoi(string(majorStr)) - minor, _ := strconv.Atoi(string(minorStr)) - - if major < 2 { - return false, fmt.Errorf("InfluxDB v%d does not support the APIs required for backups", major) - } - return minor == 0, nil -} - // downloadMetadata downloads a snapshot of the KV store, SQL DB, and bucket // manifests from the server. KV and SQL are written to local files. Bucket manifests // are parsed into a slice for additional processing. diff --git a/clients/backup/backup_internal_test.go b/clients/backup/backup_internal_test.go index 2269163..643a2fa 100644 --- a/clients/backup/backup_internal_test.go +++ b/clients/backup/backup_internal_test.go @@ -337,61 +337,3 @@ func (e *notFoundErr) Error() string { func (e *notFoundErr) ErrorCode() api.ErrorCode { return api.ERRORCODE_NOT_FOUND } - -func TestBackup_ServerIsLegacy(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - versionStr *string - legacy bool - wantErr string - }{ - { - name: "2.0.x", - versionStr: api.PtrString("2.0.7"), - legacy: true, - }, - { - name: "2.1.x", - versionStr: api.PtrString("2.1.0-RC1"), - }, - { - name: "nightly", - versionStr: api.PtrString("nightly-2020-01-01"), - }, - { - name: "dev", - versionStr: api.PtrString("some.custom-version.2"), - }, - { - name: "1.x", - versionStr: api.PtrString("1.9.3"), - wantErr: "InfluxDB v1 does not support the APIs", - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - ctrl := gomock.NewController(t) - healthApi := mock.NewMockHealthApi(ctrl) - healthApi.EXPECT().GetHealth(gomock.Any()).Return(api.ApiGetHealthRequest{ApiService: healthApi}) - healthApi.EXPECT().GetHealthExecute(gomock.Any()).Return(api.HealthCheck{Version: tc.versionStr}, nil) - - client := Client{HealthApi: healthApi} - isLegacy, err := client.serverIsLegacy(context.Background()) - - if tc.wantErr != "" { - require.Error(t, err) - require.Contains(t, err.Error(), tc.wantErr) - return - } - - require.NoError(t, err) - require.Equal(t, tc.legacy, isLegacy) - }) - } -} diff --git a/clients/backup/bolt.go b/clients/backup/bolt.go index 9be91e9..804e9c4 100644 --- a/clients/backup/bolt.go +++ b/clients/backup/bolt.go @@ -8,7 +8,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/influxdata/influx-cli/v2/api" - "github.com/influxdata/influx-cli/v2/clients/backup/internal" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" "go.etcd.io/bbolt" ) @@ -160,7 +160,7 @@ func extractBucketManifest(boltPath string) ([]api.BucketMetadataManifest, error return errors.New("v1 database info not found in local KV store") } - var pb internal.Data + var pb br.Data if err := proto.Unmarshal(fullMeta, &pb); err != nil { return fmt.Errorf("failed to unmarshal v1 database info: %w", err) } @@ -193,7 +193,7 @@ func extractBucketManifest(boltPath string) ([]api.BucketMetadataManifest, error return manifests, nil } -func unmarshalRawDBI(rawDBI internal.DatabaseInfo) influxdbV1DatabaseInfo { +func unmarshalRawDBI(rawDBI br.DatabaseInfo) influxdbV1DatabaseInfo { dbi := influxdbV1DatabaseInfo{ Name: rawDBI.GetName(), DefaultRetentionPolicy: rawDBI.GetDefaultRetentionPolicy(), @@ -208,7 +208,7 @@ func unmarshalRawDBI(rawDBI internal.DatabaseInfo) influxdbV1DatabaseInfo { return dbi } -func unmarshalRawRPI(rawRPI internal.RetentionPolicyInfo) influxdbV1RetentionPolicyInfo { +func unmarshalRawRPI(rawRPI br.RetentionPolicyInfo) influxdbV1RetentionPolicyInfo { rpi := influxdbV1RetentionPolicyInfo{ Name: rawRPI.GetName(), ReplicaN: int32(rawRPI.GetReplicaN()), @@ -236,7 +236,7 @@ func unmarshalRawRPI(rawRPI internal.RetentionPolicyInfo) influxdbV1RetentionPol return rpi } -func unmarshalRawSGI(rawSGI internal.ShardGroupInfo) influxdbV1ShardGroupInfo { +func unmarshalRawSGI(rawSGI br.ShardGroupInfo) influxdbV1ShardGroupInfo { sgi := influxdbV1ShardGroupInfo{ ID: int64(rawSGI.GetID()), StartTime: time.Unix(0, rawSGI.GetStartTime()).UTC(), @@ -254,7 +254,7 @@ func unmarshalRawSGI(rawSGI internal.ShardGroupInfo) influxdbV1ShardGroupInfo { return sgi } -func unmarshalRawShard(rawShard internal.ShardInfo) influxdbV1ShardInfo { +func unmarshalRawShard(rawShard br.ShardInfo) influxdbV1ShardInfo { si := influxdbV1ShardInfo{ ID: int64(rawShard.GetID()), } diff --git a/clients/backup/manifest.go b/clients/backup/manifest.go index fa8aab2..e88bdc9 100644 --- a/clients/backup/manifest.go +++ b/clients/backup/manifest.go @@ -94,12 +94,12 @@ func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*br. m := br.ManifestShardEntry{ ID: manifest.Id, - ShardOwners: make([]br.ShardOwner, len(manifest.ShardOwners)), + ShardOwners: make([]br.ShardOwnerEntry, len(manifest.ShardOwners)), ManifestFileEntry: *shardFileInfo, } for i, o := range manifest.ShardOwners { - m.ShardOwners[i] = br.ShardOwner{ + m.ShardOwners[i] = br.ShardOwnerEntry{ NodeID: o.NodeID, } } diff --git a/clients/backup/manifest_test.go b/clients/backup/manifest_test.go index f188fdf..8b5f45c 100644 --- a/clients/backup/manifest_test.go +++ b/clients/backup/manifest_test.go @@ -123,7 +123,7 @@ func TestConvertBucketManifest(t *testing.T) { Shards: []br.ManifestShardEntry{ { ID: 10, - ShardOwners: []br.ShardOwner{{NodeID: 1}}, + ShardOwners: []br.ShardOwnerEntry{{NodeID: 1}}, ManifestFileEntry: br.ManifestFileEntry{ FileName: "10.gz", Size: 1000, @@ -140,7 +140,7 @@ func TestConvertBucketManifest(t *testing.T) { Shards: []br.ManifestShardEntry{ { ID: 30, - ShardOwners: []br.ShardOwner{}, + ShardOwners: []br.ShardOwnerEntry{}, ManifestFileEntry: br.ManifestFileEntry{ FileName: "30.gz", Size: 3000, diff --git a/clients/restore/restore.go b/clients/restore/restore.go index 3d9d349..d2153dd 100644 --- a/clients/restore/restore.go +++ b/clients/restore/restore.go @@ -10,7 +10,9 @@ import ( "path/filepath" "sort" "strings" + "time" + "github.com/gogo/protobuf/proto" "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/clients" br "github.com/influxdata/influx-cli/v2/internal/backup_restore" @@ -19,7 +21,9 @@ import ( type Client struct { clients.CLI + api.HealthApi api.RestoreApi + api.BucketsApi api.OrganizationsApi manifest br.Manifest @@ -72,10 +76,17 @@ func (c *Client) Restore(ctx context.Context, params *Params) error { if err := c.loadManifests(params.Path); err != nil { return err } - if params.Full { - return c.fullRestore(ctx, params.Path) + + // The APIs we use to restore data depends on the server's version. + legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi) + if err != nil { + return err } - return c.partialRestore(ctx, params) + + if params.Full { + return c.fullRestore(ctx, params.Path, legacyServer) + } + return c.partialRestore(ctx, params, legacyServer) } // loadManifests finds and merges all backup manifests stored in a given directory, @@ -124,9 +135,14 @@ func (c *Client) loadManifests(path string) error { return nil } -func (c Client) fullRestore(ctx context.Context, path string) error { +// fullRestore completely replaces all metadata and data on the server with the contents of a local backup. +func (c Client) fullRestore(ctx context.Context, path string, legacy bool) error { + if legacy && c.manifest.SQL != nil { + return fmt.Errorf("cannot fully restore data from %s: target server's version too old to restore SQL metadata", path) + } + // Make sure we can read both local metadata snapshots before - kvBytes, err := c.readFileGzipped(path, c.manifest.KV) + kvBytes, err := readFileGzipped(path, c.manifest.KV) if err != nil { return fmt.Errorf("failed to open local KV backup at %q: %w", filepath.Join(path, c.manifest.KV.FileName), err) } @@ -134,7 +150,7 @@ func (c Client) fullRestore(ctx context.Context, path string) error { var sqlBytes io.ReadCloser if c.manifest.SQL != nil { - sqlBytes, err = c.readFileGzipped(path, *c.manifest.SQL) + sqlBytes, err = readFileGzipped(path, *c.manifest.SQL) if err != nil { return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err) } @@ -168,7 +184,7 @@ func (c Client) fullRestore(ctx context.Context, path string) error { for _, rp := range b.RetentionPolicies { for _, sg := range rp.ShardGroups { for _, s := range sg.Shards { - if err := c.restoreShard(ctx, path, s); err != nil { + if err := c.restoreShard(ctx, path, s, legacy); err != nil { return err } } @@ -179,7 +195,9 @@ func (c Client) fullRestore(ctx context.Context, path string) error { return nil } -func (c Client) partialRestore(ctx context.Context, params *Params) (err error) { +// partialRestore creates a bucket (or buckets) on the target server, and seeds it with data +// from a local backup. +func (c Client) partialRestore(ctx context.Context, params *Params, legacy bool) (err error) { orgIds := map[string]string{} for _, bkt := range c.manifest.Buckets { @@ -216,19 +234,15 @@ func (c Client) partialRestore(ctx context.Context, params *Params) (err error) bkt.BucketName = params.NewBucketName } - log.Printf("INFO: Restoring bucket %q as %q\n", bkt.BucketID, bkt.BucketName) - bucketMapping, err := c.PostRestoreBucketMetadata(ctx). - BucketMetadataManifest(ConvertBucketManifest(bkt)). - Execute() + restoreBucket := c.restoreBucket + if legacy { + restoreBucket = c.restoreBucketLegacy + } + shardIdMap, err := restoreBucket(ctx, bkt) if err != nil { return fmt.Errorf("failed to restore bucket %q: %w", bkt.BucketName, err) } - shardIdMap := make(map[int64]int64, len(bucketMapping.ShardMappings)) - for _, mapping := range bucketMapping.ShardMappings { - shardIdMap[mapping.OldId] = mapping.NewId - } - for _, rp := range bkt.RetentionPolicies { for _, sg := range rp.ShardGroups { for _, sh := range sg.Shards { @@ -238,7 +252,7 @@ func (c Client) partialRestore(ctx context.Context, params *Params) (err error) continue } sh.ID = newID - if err := c.restoreShard(ctx, params.Path, sh); err != nil { + if err := c.restoreShard(ctx, params.Path, sh, legacy); err != nil { return err } } @@ -249,6 +263,67 @@ func (c Client) partialRestore(ctx context.Context, params *Params) (err error) return nil } +// restoreBucket creates a new bucket and pre-generates a set of shards within that bucket, returning +// a mapping between the shard IDs stored in a local backup and the new shard IDs generated on the server. +func (c Client) restoreBucket(ctx context.Context, bkt br.ManifestBucketEntry) (map[int64]int64, error) { + log.Printf("INFO: Restoring bucket %q as %q\n", bkt.BucketID, bkt.BucketName) + bucketMapping, err := c.PostRestoreBucketMetadata(ctx). + BucketMetadataManifest(ConvertBucketManifest(bkt)). + Execute() + if err != nil { + return nil, err + } + + shardIdMap := make(map[int64]int64, len(bucketMapping.ShardMappings)) + for _, mapping := range bucketMapping.ShardMappings { + shardIdMap[mapping.OldId] = mapping.NewId + } + return shardIdMap, nil +} + +// restoreBucketLegacy creates a new bucket and pre-generates a set of shards within that bucket, returning +// a mapping between the shard IDs stored in a local backup and the new shard IDs generated on the server. +// +// The server-side logic to do all this was introduced in v2.1.0. To support using newer CLI versions against +// v2.0.x of the server, we replicate the logic here via multiple API calls. +func (c Client) restoreBucketLegacy(ctx context.Context, bkt br.ManifestBucketEntry) (map[int64]int64, error) { + log.Printf("INFO: Restoring bucket %q as %q using legacy APIs\n", bkt.BucketID, bkt.BucketName) + // Legacy APIs require creating the bucket as a separate call. + rps := make([]api.RetentionRule, len(bkt.RetentionPolicies)) + for i, rp := range bkt.RetentionPolicies { + rps[i] = *api.NewRetentionRuleWithDefaults() + rps[i].EverySeconds = int64(time.Duration(rp.Duration).Seconds()) + sgd := int64(time.Duration(rp.ShardGroupDuration).Seconds()) + rps[i].ShardGroupDurationSeconds = &sgd + } + + bucketReq := *api.NewPostBucketRequest(bkt.OrganizationID, bkt.BucketName, rps) + bucketReq.Description = bkt.Description + + newBkt, err := c.PostBuckets(ctx).PostBucketRequest(bucketReq).Execute() + if err != nil { + return nil, fmt.Errorf("couldn't create bucket %q: %w", bkt.BucketName, err) + } + + dbi := bucketToDBI(bkt) + dbiBytes, err := proto.Marshal(&dbi) + if err != nil { + return nil, err + } + + shardMapJSON, err := c.PostRestoreBucketID(ctx, *newBkt.Id).Body(dbiBytes).Execute() + if err != nil { + return nil, fmt.Errorf("couldn't restore database info for %q: %w", bkt.BucketName, err) + } + + var shardMap map[int64]int64 + if err := json.Unmarshal([]byte(shardMapJSON), &shardMap); err != nil { + return nil, fmt.Errorf("couldn't parse result of restoring database info for %q: %w", bkt.BucketName, err) + } + + return shardMap, nil +} + // restoreOrg gets the ID for the org with the given name, creating the org if it doesn't already exist. func (c Client) restoreOrg(ctx context.Context, name string) (string, error) { // NOTE: Our orgs API returns a 404 instead of an empty list when filtering by a specific name. @@ -272,9 +347,88 @@ func (c Client) restoreOrg(ctx context.Context, name string) (string, error) { return *orgs.GetOrgs()[0].Id, nil } +func bucketToDBI(b br.ManifestBucketEntry) br.DatabaseInfo { + dbi := br.DatabaseInfo{ + Name: &b.BucketID, + DefaultRetentionPolicy: &b.DefaultRetentionPolicy, + RetentionPolicies: make([]*br.RetentionPolicyInfo, len(b.RetentionPolicies)), + ContinuousQueries: nil, + } + for i, rp := range b.RetentionPolicies { + converted := retentionPolicyToRPI(rp) + dbi.RetentionPolicies[i] = &converted + } + return dbi +} + +func retentionPolicyToRPI(rp br.ManifestRetentionPolicy) br.RetentionPolicyInfo { + replicaN := uint32(rp.ReplicaN) + rpi := br.RetentionPolicyInfo{ + Name: &rp.Name, + Duration: &rp.Duration, + ShardGroupDuration: &rp.ShardGroupDuration, + ReplicaN: &replicaN, + ShardGroups: make([]*br.ShardGroupInfo, len(rp.ShardGroups)), + Subscriptions: make([]*br.SubscriptionInfo, len(rp.Subscriptions)), + } + for i, sg := range rp.ShardGroups { + converted := shardGroupToSGI(sg) + rpi.ShardGroups[i] = &converted + } + for i, s := range rp.Subscriptions { + converted := br.SubscriptionInfo{ + Name: &s.Name, + Mode: &s.Mode, + Destinations: s.Destinations, + } + rpi.Subscriptions[i] = &converted + } + return rpi +} + +func shardGroupToSGI(sg br.ManifestShardGroup) br.ShardGroupInfo { + id := uint64(sg.ID) + start := sg.StartTime.UnixNano() + end := sg.EndTime.UnixNano() + var deleted, truncated int64 + if sg.DeletedAt != nil { + deleted = sg.DeletedAt.UnixNano() + } + if sg.TruncatedAt != nil { + truncated = sg.TruncatedAt.UnixNano() + } + sgi := br.ShardGroupInfo{ + ID: &id, + StartTime: &start, + EndTime: &end, + DeletedAt: &deleted, + Shards: make([]*br.ShardInfo, len(sg.Shards)), + TruncatedAt: &truncated, + } + for i, s := range sg.Shards { + converted := shardToSI(s) + sgi.Shards[i] = &converted + } + return sgi +} + +func shardToSI(shard br.ManifestShardEntry) br.ShardInfo { + id := uint64(shard.ID) + si := br.ShardInfo{ + ID: &id, + Owners: make([]*br.ShardOwner, len(shard.ShardOwners)), + } + for i, o := range shard.ShardOwners { + oid := uint64(o.NodeID) + converted := br.ShardOwner{NodeID: &oid} + si.Owners[i] = &converted + } + return si +} + // readFileGzipped opens a local file and returns a reader of its contents, // compressed with gzip. -func (c Client) readFileGzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) { +func readFileGzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) { fullPath := filepath.Join(path, file.FileName) f, err := os.Open(fullPath) if err != nil { @@ -286,20 +440,47 @@ func (c Client) readFileGzipped(path string, file br.ManifestFileEntry) (io.Read return gzip.NewGzipPipe(f), nil } -func (c Client) restoreShard(ctx context.Context, path string, m br.ManifestShardEntry) error { +// readFileGunzipped opens a local file and returns a reader of its contents, +// gunzipping it if it is compressed. +func readFileGunzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) { + fullPath := filepath.Join(path, file.FileName) + f, err := os.Open(fullPath) + if err != nil { + return nil, err + } + if file.Compression == br.NoCompression { + return f, nil + } + reader, err := gzip.NewGunzipReadCloser(f) + if err != nil { + f.Close() + return nil, err + } + return reader, nil +} + +// restoreShard overwrites the contents of a single shard on the server using TSM stored in a local backup. +func (c Client) restoreShard(ctx context.Context, path string, m br.ManifestShardEntry, legacy bool) error { + read := readFileGzipped + if legacy { + // The legacy API didn't support gzipped uploads. + read = readFileGunzipped + } // Make sure we can read the local snapshot. - tsmBytes, err := c.readFileGzipped(path, m.ManifestFileEntry) + tsmBytes, err := read(path, m.ManifestFileEntry) if err != nil { return fmt.Errorf("failed to open local TSM snapshot at %q: %w", filepath.Join(path, m.FileName), err) } defer tsmBytes.Close() - log.Printf("INFO: Restoring TSM snapshot for shard %d\n", m.ID) - if err := c.PostRestoreShardId(ctx, fmt.Sprintf("%d", m.ID)). - ContentEncoding("gzip"). + req := c.PostRestoreShardId(ctx, fmt.Sprintf("%d", m.ID)). ContentType("application/octet-stream"). - Body(tsmBytes). - Execute(); err != nil { + Body(tsmBytes) + if !legacy { + req = req.ContentEncoding("gzip") + } + log.Printf("INFO: Restoring TSM snapshot for shard %d\n", m.ID) + if err := req.Execute(); err != nil { return fmt.Errorf("failed to restore TSM snapshot for shard %d: %w", m.ID, err) } return nil diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go index a703df7..7e350b3 100644 --- a/cmd/influx/restore.go +++ b/cmd/influx/restore.go @@ -87,7 +87,9 @@ Examples: api := getAPI(ctx) client := restore.Client{ CLI: getCLI(ctx), + HealthApi: api.HealthApi.OnlyOSS(), RestoreApi: api.RestoreApi.OnlyOSS(), + BucketsApi: api.BucketsApi, OrganizationsApi: api.OrganizationsApi, } return client.Restore(getContext(ctx), ¶ms) diff --git a/clients/backup/internal/README.md b/internal/backup_restore/README.md similarity index 92% rename from clients/backup/internal/README.md rename to internal/backup_restore/README.md index f3c4b34..3b18429 100644 --- a/clients/backup/internal/README.md +++ b/internal/backup_restore/README.md @@ -9,4 +9,4 @@ and into this repository. This file isn't intended to be modified. If `meta.pb.go` ever needs to be re-generated, follow these steps: 1. Install `protoc` (i.e. via `brew install protobuf`) 2. Run `go install github.com/gogo/protobuf/protoc-gen-gogo` from within this repository -3. Run `go generate ` +3. Run `go generate ` diff --git a/internal/backup_restore/api_check.go b/internal/backup_restore/api_check.go new file mode 100644 index 0000000..2bda3f5 --- /dev/null +++ b/internal/backup_restore/api_check.go @@ -0,0 +1,45 @@ +package backup_restore + +import ( + "context" + "fmt" + "log" + "regexp" + "strconv" + + "github.com/influxdata/influx-cli/v2/api" +) + +var semverRegex = regexp.MustCompile(`(\d+)\.(\d+)\.(\d+).*`) + +// ServerIsLegacy checks if the InfluxDB server targeted by the backup is running v2.0.x, +// which used different APIs for backups. +func ServerIsLegacy(ctx context.Context, client api.HealthApi) (bool, error) { + res, err := client.GetHealth(ctx).Execute() + if err != nil { + return false, fmt.Errorf("API compatibility check failed: %w", err) + } + var version string + if res.Version != nil { + version = *res.Version + } + + matches := semverRegex.FindSubmatch([]byte(version)) + if matches == nil { + // Assume non-semver versions are only reported by nightlies & dev builds, which + // should now support the new APIs. + log.Printf("WARN: Couldn't parse version %q reported by server, assuming latest backup/restore APIs are supported", version) + return false, nil + } + // matches[0] is the entire matched string, capture groups start at 1. + majorStr, minorStr := matches[1], matches[2] + // Ignore the err values here because the regex-match ensures we can parse the captured + // groups as integers. + major, _ := strconv.Atoi(string(majorStr)) + minor, _ := strconv.Atoi(string(minorStr)) + + if major < 2 { + return false, fmt.Errorf("InfluxDB v%d does not support the APIs required for backup/restore", major) + } + return minor == 0, nil +} diff --git a/internal/backup_restore/api_check_test.go b/internal/backup_restore/api_check_test.go new file mode 100644 index 0000000..25a9265 --- /dev/null +++ b/internal/backup_restore/api_check_test.go @@ -0,0 +1,69 @@ +package backup_restore_test + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/influxdata/influx-cli/v2/api" + "github.com/influxdata/influx-cli/v2/internal/backup_restore" + "github.com/influxdata/influx-cli/v2/internal/mock" + "github.com/stretchr/testify/require" +) + +func TestServerIsLegacy(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + versionStr *string + legacy bool + wantErr string + }{ + { + name: "2.0.x", + versionStr: api.PtrString("2.0.7"), + legacy: true, + }, + { + name: "2.1.x", + versionStr: api.PtrString("2.1.0-RC1"), + }, + { + name: "nightly", + versionStr: api.PtrString("nightly-2020-01-01"), + }, + { + name: "dev", + versionStr: api.PtrString("some.custom-version.2"), + }, + { + name: "1.x", + versionStr: api.PtrString("1.9.3"), + wantErr: "InfluxDB v1 does not support the APIs", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + healthApi := mock.NewMockHealthApi(ctrl) + healthApi.EXPECT().GetHealth(gomock.Any()).Return(api.ApiGetHealthRequest{ApiService: healthApi}) + healthApi.EXPECT().GetHealthExecute(gomock.Any()).Return(api.HealthCheck{Version: tc.versionStr}, nil) + + isLegacy, err := backup_restore.ServerIsLegacy(context.Background(), healthApi) + + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + return + } + + require.NoError(t, err) + require.Equal(t, tc.legacy, isLegacy) + }) + } +} diff --git a/internal/backup_restore/manifest.go b/internal/backup_restore/manifest.go index d753c1d..b084ef8 100644 --- a/internal/backup_restore/manifest.go +++ b/internal/backup_restore/manifest.go @@ -78,12 +78,12 @@ type ManifestShardGroup struct { } type ManifestShardEntry struct { - ID int64 `json:"id"` - ShardOwners []ShardOwner `json:"shardOwners"` + ID int64 `json:"id"` + ShardOwners []ShardOwnerEntry `json:"shardOwners"` ManifestFileEntry } -type ShardOwner struct { +type ShardOwnerEntry struct { NodeID int64 `json:"nodeID"` } diff --git a/clients/backup/internal/meta.pb.go b/internal/backup_restore/meta.pb.go similarity index 99% rename from clients/backup/internal/meta.pb.go rename to internal/backup_restore/meta.pb.go index ec53cf5..87fc26b 100644 --- a/clients/backup/internal/meta.pb.go +++ b/internal/backup_restore/meta.pb.go @@ -1,13 +1,13 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: internal/meta.proto -package internal +package backup_restore import ( - fmt "fmt" - math "math" + "fmt" + "math" - proto "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. diff --git a/clients/backup/internal/meta.proto b/internal/backup_restore/meta.proto similarity index 100% rename from clients/backup/internal/meta.proto rename to internal/backup_restore/meta.proto diff --git a/pkg/gzip/gunzip.go b/pkg/gzip/gunzip.go new file mode 100644 index 0000000..3fc7a4a --- /dev/null +++ b/pkg/gzip/gunzip.go @@ -0,0 +1,30 @@ +package gzip + +import ( + "compress/gzip" + "io" +) + +func NewGunzipReadCloser(in io.ReadCloser) (*gunzipReadCloser, error) { + gzr, err := gzip.NewReader(in) + if err != nil { + return nil, err + } + return &gunzipReadCloser{underlying: in, gunzip: gzr}, nil +} + +type gunzipReadCloser struct { + underlying io.ReadCloser + gunzip io.ReadCloser +} + +func (gzrc *gunzipReadCloser) Read(p []byte) (int, error) { + return gzrc.gunzip.Read(p) +} + +func (gzrc *gunzipReadCloser) Close() error { + if err := gzrc.gunzip.Close(); err != nil { + return err + } + return gzrc.underlying.Close() +}