diff --git a/clients/backup/backup.go b/clients/backup/backup.go index d38a8d6..6df8f26 100644 --- a/clients/backup/backup.go +++ b/clients/backup/backup.go @@ -70,6 +70,7 @@ func (c *Client) Backup(ctx context.Context, params *Params) error { return err } c.baseName = time.Now().UTC().Format(backupFilenamePattern) + c.manifest.Version = br.ManifestVersion // The APIs we use to back up metadata depends on the server's version. legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi) @@ -228,7 +229,7 @@ func (c *Client) downloadMetadataLegacy(ctx context.Context, params *Params) err // Extract the metadata we need from the downloaded KV store, and convert it to a new-style manifest. log.Println("INFO: Extracting bucket manifest from legacy KV snapshot") - c.bucketMetadata, err = extractBucketManifest(tmpKv) + c.bucketMetadata, err = br.ExtractBucketMetadata(tmpKv) if err != nil { return fmt.Errorf("failed to extract bucket metadata from downloaded KV snapshot: %w", err) } @@ -283,7 +284,7 @@ func (c *Client) downloadBucketData(ctx context.Context, params *Params) error { if !params.matches(b) { continue } - bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) { + bktManifest, err := br.ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) { return c.downloadShardData(ctx, params, shardId) }) if err != nil { diff --git a/clients/backup/bolt.go b/clients/backup/bolt.go deleted file mode 100644 index 804e9c4..0000000 --- a/clients/backup/bolt.go +++ /dev/null @@ -1,349 +0,0 @@ -package backup - -import ( - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/gogo/protobuf/proto" - "github.com/influxdata/influx-cli/v2/api" - br "github.com/influxdata/influx-cli/v2/internal/backup_restore" - "go.etcd.io/bbolt" -) - -//go:generate protoc --gogo_out=. internal/meta.proto - -// NOTE: An unfortunate naming collision below. Bolt calls its databases "buckets". -// These are the names that were used in the metadata DB for 2.0.x versions of influxdb. -var ( - bucketsBoltBucket = []byte("bucketsv1") - organizationsBoltBucket = []byte("organizationsv1") - v1MetadataBoltBucket = []byte("v1_tsm1_metadata") - v1MetadataBoltKey = []byte("meta.db") -) - -// influxdbBucketSchema models the JSON structure used by InfluxDB 2.0.x to serialize -// bucket metadata in the embedded KV store. -type influxdbBucketSchema struct { - ID string `json:"id"` - OrgID string `json:"orgID"` - Type int `json:"type"` - Name string `json:"name"` - Description *string `json:"description,omitempty"` - RetentionPeriod time.Duration `json:"retentionPeriod"` - ShardGroupDuration time.Duration `json:"ShardGroupDuration"` -} - -// influxdbOrganizationSchema models the JSON structure used by InfluxDB 2.0.x to serialize -// organization metadata in the embedded KV store. -type influxdbOrganizationSchema struct { - ID string `json:"id"` - Name string `json:"name"` -} - -// influxdbV1DatabaseInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// database info in the embedded KV store. -type influxdbV1DatabaseInfo struct { - Name string - DefaultRetentionPolicy string - RetentionPolicies []influxdbV1RetentionPolicyInfo -} - -// influxdbV1RetentionPolicyInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// retention-policy info in the embedded KV store. -type influxdbV1RetentionPolicyInfo struct { - Name string - ReplicaN int32 - Duration int64 - ShardGroupDuration int64 - ShardGroups []influxdbV1ShardGroupInfo - Subscriptions []influxdbV1SubscriptionInfo -} - -// influxdbV1ShardGroupInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// shard-group info in the embedded KV store. -type influxdbV1ShardGroupInfo struct { - ID int64 - StartTime time.Time - EndTime time.Time - DeletedAt time.Time - Shards []influxdbV1ShardInfo - TruncatedAt time.Time -} - -// influxdbV1ShardInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// shard info in the embedded KV store. -type influxdbV1ShardInfo struct { - ID int64 - Owners []influxdbV1ShardOwnerInfo -} - -// inflxudbV1ShardOwnerInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// shard-owner info in the embedded KV store. -type influxdbV1ShardOwnerInfo struct { - NodeID int64 -} - -// influxdbV1SubscriptionInfo models the protobuf structure used by InfluxDB 2.0.x to serialize -// subscription info in the embedded KV store. -type influxdbV1SubscriptionInfo struct { - Name string - Mode string - Destinations []string -} - -// extractBucketManifest reads a boltdb backed up from InfluxDB 2.0.x, converting a subset of the -// metadata it contains into a set of 2.1.x bucket manifests. -func extractBucketManifest(boltPath string) ([]api.BucketMetadataManifest, error) { - db, err := bbolt.Open(boltPath, 0666, &bbolt.Options{ReadOnly: true, Timeout: 1 * time.Second}) - if err != nil { - // Hack to give a slightly nicer error message for a known failure mode when bolt calls - // mmap on a file system that doesn't support the MAP_SHARED option. - // - // See: https://github.com/boltdb/bolt/issues/272 - // See: https://stackoverflow.com/a/18421071 - if err.Error() == "invalid argument" { - return nil, fmt.Errorf("unable to open boltdb: mmap of %q may not support the MAP_SHARED option", boltPath) - } - - return nil, fmt.Errorf("unable to open boltdb: %w", err) - } - defer db.Close() - - // Read raw metadata needed to construct a manifest. - var buckets []influxdbBucketSchema - orgNamesById := map[string]string{} - dbInfoByBucketId := map[string]influxdbV1DatabaseInfo{} - - if err := db.View(func(tx *bbolt.Tx) error { - bucketDB := tx.Bucket(bucketsBoltBucket) - if bucketDB == nil { - return errors.New("bucket metadata not found in local KV store") - } - - if err := bucketDB.ForEach(func(k, v []byte) error { - var b influxdbBucketSchema - if err := json.Unmarshal(v, &b); err != nil { - return err - } - if b.Type != 1 { // 1 == "system" - buckets = append(buckets, b) - } - return nil - }); err != nil { - return fmt.Errorf("failed to read bucket metadata from local KV store: %w", err) - } - - orgsDB := tx.Bucket(organizationsBoltBucket) - if orgsDB == nil { - return errors.New("organization metadata not found in local KV store") - } - - if err := orgsDB.ForEach(func(k, v []byte) error { - var o influxdbOrganizationSchema - if err := json.Unmarshal(v, &o); err != nil { - return err - } - orgNamesById[o.ID] = o.Name - return nil - }); err != nil { - return fmt.Errorf("failed to read organization metadata from local KV store: %w", err) - } - - v1DB := tx.Bucket(v1MetadataBoltBucket) - if v1DB == nil { - return errors.New("v1 database info not found in local KV store") - } - fullMeta := v1DB.Get(v1MetadataBoltKey) - if fullMeta == nil { - return errors.New("v1 database info not found in local KV store") - } - - var pb br.Data - if err := proto.Unmarshal(fullMeta, &pb); err != nil { - return fmt.Errorf("failed to unmarshal v1 database info: %w", err) - } - for _, rawDBI := range pb.GetDatabases() { - if rawDBI == nil { - continue - } - unmarshalled := unmarshalRawDBI(*rawDBI) - dbInfoByBucketId[unmarshalled.Name] = unmarshalled - } - - return nil - }); err != nil { - return nil, err - } - - manifests := make([]api.BucketMetadataManifest, len(buckets)) - for i, b := range buckets { - orgName, ok := orgNamesById[b.OrgID] - if !ok { - return nil, fmt.Errorf("local KV store in inconsistent state: no organization found with ID %q", b.OrgID) - } - dbi, ok := dbInfoByBucketId[b.ID] - if !ok { - return nil, fmt.Errorf("local KV store in inconsistent state: no V1 database info found for bucket %q", b.Name) - } - manifests[i] = combineMetadata(b, orgName, dbi) - } - - return manifests, nil -} - -func unmarshalRawDBI(rawDBI br.DatabaseInfo) influxdbV1DatabaseInfo { - dbi := influxdbV1DatabaseInfo{ - Name: rawDBI.GetName(), - DefaultRetentionPolicy: rawDBI.GetDefaultRetentionPolicy(), - RetentionPolicies: make([]influxdbV1RetentionPolicyInfo, 0, len(rawDBI.GetRetentionPolicies())), - } - for _, rp := range rawDBI.GetRetentionPolicies() { - if rp == nil { - continue - } - dbi.RetentionPolicies = append(dbi.RetentionPolicies, unmarshalRawRPI(*rp)) - } - return dbi -} - -func unmarshalRawRPI(rawRPI br.RetentionPolicyInfo) influxdbV1RetentionPolicyInfo { - rpi := influxdbV1RetentionPolicyInfo{ - Name: rawRPI.GetName(), - ReplicaN: int32(rawRPI.GetReplicaN()), - Duration: rawRPI.GetDuration(), - ShardGroupDuration: rawRPI.GetShardGroupDuration(), - ShardGroups: make([]influxdbV1ShardGroupInfo, 0, len(rawRPI.GetShardGroups())), - Subscriptions: make([]influxdbV1SubscriptionInfo, 0, len(rawRPI.GetSubscriptions())), - } - for _, sg := range rawRPI.GetShardGroups() { - if sg == nil { - continue - } - rpi.ShardGroups = append(rpi.ShardGroups, unmarshalRawSGI(*sg)) - } - for _, s := range rawRPI.GetSubscriptions() { - if s == nil { - continue - } - rpi.Subscriptions = append(rpi.Subscriptions, influxdbV1SubscriptionInfo{ - Name: s.GetName(), - Mode: s.GetMode(), - Destinations: s.GetDestinations(), - }) - } - return rpi -} - -func unmarshalRawSGI(rawSGI br.ShardGroupInfo) influxdbV1ShardGroupInfo { - sgi := influxdbV1ShardGroupInfo{ - ID: int64(rawSGI.GetID()), - StartTime: time.Unix(0, rawSGI.GetStartTime()).UTC(), - EndTime: time.Unix(0, rawSGI.GetEndTime()).UTC(), - DeletedAt: time.Unix(0, rawSGI.GetDeletedAt()).UTC(), - Shards: make([]influxdbV1ShardInfo, 0, len(rawSGI.GetShards())), - TruncatedAt: time.Unix(0, rawSGI.GetTruncatedAt()).UTC(), - } - for _, s := range rawSGI.GetShards() { - if s == nil { - continue - } - sgi.Shards = append(sgi.Shards, unmarshalRawShard(*s)) - } - return sgi -} - -func unmarshalRawShard(rawShard br.ShardInfo) influxdbV1ShardInfo { - si := influxdbV1ShardInfo{ - ID: int64(rawShard.GetID()), - } - // If deprecated "OwnerIDs" exists then convert it to "Owners" format. - //lint:ignore SA1019 we need to check for the presence of the deprecated field so we can convert it - oldStyleOwnerIds := rawShard.GetOwnerIDs() - if len(oldStyleOwnerIds) > 0 { - si.Owners = make([]influxdbV1ShardOwnerInfo, len(oldStyleOwnerIds)) - for i, oid := range oldStyleOwnerIds { - si.Owners[i] = influxdbV1ShardOwnerInfo{NodeID: int64(oid)} - } - } else { - si.Owners = make([]influxdbV1ShardOwnerInfo, 0, len(rawShard.GetOwners())) - for _, o := range rawShard.GetOwners() { - if o == nil { - continue - } - si.Owners = append(si.Owners, influxdbV1ShardOwnerInfo{NodeID: int64(o.GetNodeID())}) - } - } - return si -} - -func combineMetadata(bucket influxdbBucketSchema, orgName string, dbi influxdbV1DatabaseInfo) api.BucketMetadataManifest { - m := api.BucketMetadataManifest{ - OrganizationID: bucket.OrgID, - OrganizationName: orgName, - BucketID: bucket.ID, - BucketName: bucket.Name, - DefaultRetentionPolicy: dbi.DefaultRetentionPolicy, - RetentionPolicies: make([]api.RetentionPolicyManifest, len(dbi.RetentionPolicies)), - } - if bucket.Description != nil && *bucket.Description != "" { - m.Description = bucket.Description - } - for i, rp := range dbi.RetentionPolicies { - m.RetentionPolicies[i] = convertRPI(rp) - } - return m -} - -func convertRPI(rpi influxdbV1RetentionPolicyInfo) api.RetentionPolicyManifest { - m := api.RetentionPolicyManifest{ - Name: rpi.Name, - ReplicaN: rpi.ReplicaN, - Duration: rpi.Duration, - ShardGroupDuration: rpi.ShardGroupDuration, - ShardGroups: make([]api.ShardGroupManifest, len(rpi.ShardGroups)), - Subscriptions: make([]api.SubscriptionManifest, len(rpi.Subscriptions)), - } - for i, sg := range rpi.ShardGroups { - m.ShardGroups[i] = convertSGI(sg) - } - for i, s := range rpi.Subscriptions { - m.Subscriptions[i] = api.SubscriptionManifest{ - Name: s.Name, - Mode: s.Mode, - Destinations: s.Destinations, - } - } - return m -} - -func convertSGI(sgi influxdbV1ShardGroupInfo) api.ShardGroupManifest { - m := api.ShardGroupManifest{ - Id: sgi.ID, - StartTime: sgi.StartTime, - EndTime: sgi.EndTime, - Shards: make([]api.ShardManifest, len(sgi.Shards)), - } - if sgi.DeletedAt.Unix() != 0 { - m.DeletedAt = &sgi.DeletedAt - } - if sgi.TruncatedAt.Unix() != 0 { - m.TruncatedAt = &sgi.TruncatedAt - } - for i, s := range sgi.Shards { - m.Shards[i] = convertShard(s) - } - return m -} - -func convertShard(shard influxdbV1ShardInfo) api.ShardManifest { - m := api.ShardManifest{ - Id: shard.ID, - ShardOwners: make([]api.ShardOwner, len(shard.Owners)), - } - for i, o := range shard.Owners { - m.ShardOwners[i] = api.ShardOwner{NodeID: o.NodeID} - } - return m -} diff --git a/clients/backup/manifest.go b/clients/backup/manifest.go deleted file mode 100644 index e88bdc9..0000000 --- a/clients/backup/manifest.go +++ /dev/null @@ -1,108 +0,0 @@ -package backup - -import ( - "fmt" - - "github.com/influxdata/influx-cli/v2/api" - br "github.com/influxdata/influx-cli/v2/internal/backup_restore" -) - -func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestBucketEntry, error) { - m := br.ManifestBucketEntry{ - OrganizationID: manifest.OrganizationID, - OrganizationName: manifest.OrganizationName, - BucketID: manifest.BucketID, - BucketName: manifest.BucketName, - Description: manifest.Description, - DefaultRetentionPolicy: manifest.DefaultRetentionPolicy, - RetentionPolicies: make([]br.ManifestRetentionPolicy, len(manifest.RetentionPolicies)), - } - - for i, rp := range manifest.RetentionPolicies { - var err error - m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard) - if err != nil { - return br.ManifestBucketEntry{}, err - } - } - - return m, nil -} - -func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestRetentionPolicy, error) { - m := br.ManifestRetentionPolicy{ - Name: manifest.Name, - ReplicaN: manifest.ReplicaN, - Duration: manifest.Duration, - ShardGroupDuration: manifest.ShardGroupDuration, - ShardGroups: make([]br.ManifestShardGroup, len(manifest.ShardGroups)), - Subscriptions: make([]br.ManifestSubscription, len(manifest.Subscriptions)), - } - - for i, sg := range manifest.ShardGroups { - var err error - m.ShardGroups[i], err = ConvertShardGroup(sg, getShard) - if err != nil { - return br.ManifestRetentionPolicy{}, err - } - } - - for i, s := range manifest.Subscriptions { - m.Subscriptions[i] = br.ManifestSubscription{ - Name: s.Name, - Mode: s.Mode, - Destinations: s.Destinations, - } - } - - return m, nil -} - -func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestShardGroup, error) { - m := br.ManifestShardGroup{ - ID: manifest.Id, - StartTime: manifest.StartTime, - EndTime: manifest.EndTime, - DeletedAt: manifest.DeletedAt, - TruncatedAt: manifest.TruncatedAt, - Shards: make([]br.ManifestShardEntry, 0, len(manifest.Shards)), - } - - for _, sh := range manifest.Shards { - maybeShard, err := ConvertShard(sh, getShard) - if err != nil { - return br.ManifestShardGroup{}, err - } - // Shard deleted mid-backup. - if maybeShard == nil { - continue - } - m.Shards = append(m.Shards, *maybeShard) - } - - return m, nil -} - -func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (*br.ManifestShardEntry, error) { - shardFileInfo, err := getShard(manifest.Id) - if err != nil { - return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err) - } - if shardFileInfo == nil { - return nil, nil - } - - m := br.ManifestShardEntry{ - ID: manifest.Id, - ShardOwners: make([]br.ShardOwnerEntry, len(manifest.ShardOwners)), - ManifestFileEntry: *shardFileInfo, - } - - for i, o := range manifest.ShardOwners { - m.ShardOwners[i] = br.ShardOwnerEntry{ - NodeID: o.NodeID, - } - } - - return &m, nil -} diff --git a/clients/restore/manifest.go b/clients/restore/manifest.go index 284ac13..23d4882 100644 --- a/clients/restore/manifest.go +++ b/clients/restore/manifest.go @@ -1,10 +1,115 @@ package restore import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "github.com/influxdata/influx-cli/v2/api" br "github.com/influxdata/influx-cli/v2/internal/backup_restore" ) +// versionSwitch models the subset of fields needed to distinguish different versions of the CLI's backup manifest. +type versionSwitch struct { + Version int `json:"manifestVersion,omitempty"` +} + +// readManifest parses the manifest file at the given path, converting it to the latest version of our manifest +// if needed. +func readManifest(path string) (manifest br.Manifest, err error) { + var w struct { + versionSwitch + *br.Manifest + *legacyManifest + } + buf, err := os.ReadFile(path) + if err != nil { + return br.Manifest{}, fmt.Errorf("failed to read local manifest at %q: %w", path, err) + } + + if err := json.Unmarshal(buf, &w.versionSwitch); err != nil { + return br.Manifest{}, fmt.Errorf("failed to check version of local manifest at %q: %w", path, err) + } + switch w.versionSwitch.Version { + case br.ManifestVersion: + err = json.Unmarshal(buf, &manifest) + case 0: // InfluxDB 2.0.x manifests didn't have a version field. + var lm legacyManifest + if err := json.Unmarshal(buf, &lm); err != nil { + return br.Manifest{}, fmt.Errorf("failed to parse legacy manifest at %q: %w", path, err) + } + manifest, err = convertManifest(path, lm) + default: + return br.Manifest{}, fmt.Errorf("unsupported version %d found in manifest at %q", w.versionSwitch.Version, path) + } + + return +} + +// convertManifest converts a manifest from the 2.0.x CLI into the latest manifest schema. +// NOTE: 2.0.x manifests didn't contain all the info needed by 2.1.x+, so this process requires opening & inspecting +// the bolt file referenced by the legacy manifest. +func convertManifest(path string, lm legacyManifest) (br.Manifest, error) { + // Extract bucket metadata from the local KV snapshot. + boltPath := filepath.Join(filepath.Dir(path), lm.KV.FileName) + metadata, err := br.ExtractBucketMetadata(boltPath) + if err != nil { + return br.Manifest{}, err + } + shardManifestsById := make(map[int64]br.ManifestFileEntry, len(lm.Shards)) + for _, s := range lm.Shards { + shardManifestsById[s.ShardID] = br.ManifestFileEntry{ + FileName: s.FileName, + Size: s.Size, + Compression: br.GzipCompression, + } + } + + m := br.Manifest{ + Version: br.ManifestVersion, + KV: br.ManifestFileEntry{ + FileName: lm.KV.FileName, + Size: lm.KV.Size, + Compression: br.NoCompression, + }, + Buckets: make([]br.ManifestBucketEntry, len(metadata)), + } + for i, bkt := range metadata { + m.Buckets[i], err = br.ConvertBucketManifest(bkt, func(shardId int64) (*br.ManifestFileEntry, error) { + shardManifest, ok := shardManifestsById[shardId] + if !ok { + return nil, nil + } + return &shardManifest, nil + }) + if err != nil { + return br.Manifest{}, fmt.Errorf("failed to parse entry for bucket %q in legacy manifest at %q: %w", bkt.BucketID, path, err) + } + } + + return m, nil +} + +// legacyManifest models the subset of data stored in 2.0.x CLI backup manifests that is needed for conversion +// into the latest manifest format. +type legacyManifest struct { + KV legacyKV `json:"kv"` + Shards []legacyShard `json:"files"` +} + +type legacyKV struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +type legacyShard struct { + ShardID int64 `json:"shardID"` + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +// ConvertBucketManifest converts a manifest parsed from local disk into a model compatible with the server-side API. func ConvertBucketManifest(manifest br.ManifestBucketEntry) api.BucketMetadataManifest { m := api.BucketMetadataManifest{ OrganizationID: manifest.OrganizationID, diff --git a/clients/restore/restore.go b/clients/restore/restore.go index d2153dd..e6471b1 100644 --- a/clients/restore/restore.go +++ b/clients/restore/restore.go @@ -110,11 +110,9 @@ func (c *Client) loadManifests(path string) error { continue } - var manifest br.Manifest - if buf, err := os.ReadFile(manifestFile); err != nil { - return fmt.Errorf("failed to read local manifest at %q: %w", manifestFile, err) - } else if err := json.Unmarshal(buf, &manifest); err != nil { - return fmt.Errorf("failed to parse manifest at %q: %w", manifestFile, err) + manifest, err := readManifest(manifestFile) + if err != nil { + return err } // Keep the latest KV and SQL overall. @@ -123,7 +121,10 @@ func (c *Client) loadManifests(path string) error { // Keep the latest manifest per-bucket. for _, bkt := range manifest.Buckets { - bucketManifests[bkt.BucketID] = bkt + // NOTE: Deduplicate here by keeping only the latest entry for each `/` pair. + // This prevents "bucket already exists" errors during the restore when the backup manifests contain + // entries for multiple buckets with the same name (which can happen when a bucket is deleted & re-created). + bucketManifests[fmt.Sprintf("%s/%s", bkt.OrganizationName, bkt.BucketName)] = bkt } } diff --git a/internal/backup_restore/bolt.go b/internal/backup_restore/bolt.go new file mode 100644 index 0000000..a22324b --- /dev/null +++ b/internal/backup_restore/bolt.go @@ -0,0 +1,214 @@ +package backup_restore + +import ( + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/influxdata/influx-cli/v2/api" + "go.etcd.io/bbolt" +) + +//go:generate protoc --gogo_out=. meta.proto + +// NOTE: An unfortunate naming collision below. Bolt calls its databases "buckets". +// These are the names that were used in the metadata DB for 2.0.x versions of influxdb. +var ( + bucketsBoltBucket = []byte("bucketsv1") + organizationsBoltBucket = []byte("organizationsv1") + v1MetadataBoltBucket = []byte("v1_tsm1_metadata") + v1MetadataBoltKey = []byte("meta.db") +) + +// influxdbBucketSchema models the JSON structure used by InfluxDB 2.0.x to serialize +// bucket metadata in the embedded KV store. +type influxdbBucketSchema struct { + ID string `json:"id"` + OrgID string `json:"orgID"` + Type int `json:"type"` + Name string `json:"name"` + Description *string `json:"description,omitempty"` + RetentionPeriod time.Duration `json:"retentionPeriod"` + ShardGroupDuration time.Duration `json:"ShardGroupDuration"` +} + +// influxdbOrganizationSchema models the JSON structure used by InfluxDB 2.0.x to serialize +// organization metadata in the embedded KV store. +type influxdbOrganizationSchema struct { + ID string `json:"id"` + Name string `json:"name"` +} + +// ExtractBucketMetadata reads a boltdb backed up from InfluxDB 2.0.x, converting a subset of the +// metadata it contains into a set of 2.1.x bucket manifests. +func ExtractBucketMetadata(boltPath string) ([]api.BucketMetadataManifest, error) { + db, err := bbolt.Open(boltPath, 0666, &bbolt.Options{ReadOnly: true, Timeout: 1 * time.Second}) + if err != nil { + // Hack to give a slightly nicer error message for a known failure mode when bolt calls + // mmap on a file system that doesn't support the MAP_SHARED option. + // + // See: https://github.com/boltdb/bolt/issues/272 + // See: https://stackoverflow.com/a/18421071 + if err.Error() == "invalid argument" { + return nil, fmt.Errorf("unable to open boltdb: mmap of %q may not support the MAP_SHARED option", boltPath) + } + + return nil, fmt.Errorf("unable to open boltdb: %w", err) + } + defer db.Close() + + // Read raw metadata needed to construct a manifest. + var buckets []influxdbBucketSchema + orgNamesById := map[string]string{} + dbInfoByBucketId := map[string]DatabaseInfo{} + + if err := db.View(func(tx *bbolt.Tx) error { + bucketDB := tx.Bucket(bucketsBoltBucket) + if bucketDB == nil { + return errors.New("bucket metadata not found in local KV store") + } + + if err := bucketDB.ForEach(func(k, v []byte) error { + var b influxdbBucketSchema + if err := json.Unmarshal(v, &b); err != nil { + return err + } + if b.Type != 1 { // 1 == "system" + buckets = append(buckets, b) + } + return nil + }); err != nil { + return fmt.Errorf("failed to read bucket metadata from local KV store: %w", err) + } + + orgsDB := tx.Bucket(organizationsBoltBucket) + if orgsDB == nil { + return errors.New("organization metadata not found in local KV store") + } + + if err := orgsDB.ForEach(func(k, v []byte) error { + var o influxdbOrganizationSchema + if err := json.Unmarshal(v, &o); err != nil { + return err + } + orgNamesById[o.ID] = o.Name + return nil + }); err != nil { + return fmt.Errorf("failed to read organization metadata from local KV store: %w", err) + } + + v1DB := tx.Bucket(v1MetadataBoltBucket) + if v1DB == nil { + return errors.New("v1 database info not found in local KV store") + } + fullMeta := v1DB.Get(v1MetadataBoltKey) + if fullMeta == nil { + return errors.New("v1 database info not found in local KV store") + } + + var pb Data + if err := proto.Unmarshal(fullMeta, &pb); err != nil { + return fmt.Errorf("failed to unmarshal v1 database info: %w", err) + } + for _, rawDBI := range pb.GetDatabases() { + dbInfoByBucketId[*rawDBI.Name] = *rawDBI + } + + return nil + }); err != nil { + return nil, err + } + + manifests := make([]api.BucketMetadataManifest, len(buckets)) + for i, b := range buckets { + orgName, ok := orgNamesById[b.OrgID] + if !ok { + return nil, fmt.Errorf("local KV store in inconsistent state: no organization found with ID %q", b.OrgID) + } + dbi, ok := dbInfoByBucketId[b.ID] + if !ok { + return nil, fmt.Errorf("local KV store in inconsistent state: no V1 database info found for bucket %q", b.Name) + } + manifests[i] = combineMetadata(b, orgName, dbi) + } + + return manifests, nil +} + +func combineMetadata(bucket influxdbBucketSchema, orgName string, dbi DatabaseInfo) api.BucketMetadataManifest { + m := api.BucketMetadataManifest{ + OrganizationID: bucket.OrgID, + OrganizationName: orgName, + BucketID: bucket.ID, + BucketName: bucket.Name, + DefaultRetentionPolicy: *dbi.DefaultRetentionPolicy, + RetentionPolicies: make([]api.RetentionPolicyManifest, len(dbi.RetentionPolicies)), + } + if bucket.Description != nil && *bucket.Description != "" { + m.Description = bucket.Description + } + for i, rp := range dbi.RetentionPolicies { + m.RetentionPolicies[i] = convertRPI(*rp) + } + return m +} + +func convertRPI(rpi RetentionPolicyInfo) api.RetentionPolicyManifest { + m := api.RetentionPolicyManifest{ + Name: *rpi.Name, + ReplicaN: int32(*rpi.ReplicaN), + Duration: *rpi.Duration, + ShardGroupDuration: *rpi.ShardGroupDuration, + ShardGroups: make([]api.ShardGroupManifest, len(rpi.ShardGroups)), + Subscriptions: make([]api.SubscriptionManifest, len(rpi.Subscriptions)), + } + for i, sg := range rpi.ShardGroups { + m.ShardGroups[i] = convertSGI(*sg) + } + for i, s := range rpi.Subscriptions { + m.Subscriptions[i] = api.SubscriptionManifest{ + Name: *s.Name, + Mode: *s.Mode, + Destinations: s.Destinations, + } + } + return m +} + +func convertSGI(sgi ShardGroupInfo) api.ShardGroupManifest { + var deleted, truncated *time.Time + if sgi.DeletedAt != nil { + d := time.Unix(0, *sgi.DeletedAt).UTC() + deleted = &d + } + if sgi.TruncatedAt != nil { + t := time.Unix(0, *sgi.TruncatedAt).UTC() + truncated = &t + } + + m := api.ShardGroupManifest{ + Id: int64(*sgi.ID), + StartTime: time.Unix(0, *sgi.StartTime).UTC(), + EndTime: time.Unix(0, *sgi.EndTime).UTC(), + DeletedAt: deleted, + TruncatedAt: truncated, + Shards: make([]api.ShardManifest, len(sgi.Shards)), + } + for i, s := range sgi.Shards { + m.Shards[i] = convertShard(*s) + } + return m +} + +func convertShard(shard ShardInfo) api.ShardManifest { + m := api.ShardManifest{ + Id: int64(*shard.ID), + ShardOwners: make([]api.ShardOwner, len(shard.Owners)), + } + for i, o := range shard.Owners { + m.ShardOwners[i] = api.ShardOwner{NodeID: int64(*o.NodeID)} + } + return m +} diff --git a/clients/backup/bolt_internal_test.go b/internal/backup_restore/bolt_test.go similarity index 94% rename from clients/backup/bolt_internal_test.go rename to internal/backup_restore/bolt_test.go index f7dc304..538cdfb 100644 --- a/clients/backup/bolt_internal_test.go +++ b/internal/backup_restore/bolt_test.go @@ -1,4 +1,4 @@ -package backup +package backup_restore_test import ( "compress/gzip" @@ -9,6 +9,7 @@ import ( "testing" "github.com/influxdata/influx-cli/v2/api" + "github.com/influxdata/influx-cli/v2/internal/backup_restore" "github.com/stretchr/testify/require" ) @@ -41,7 +42,7 @@ func TestExtractManifest(t *testing.T) { require.NoError(t, tmpBolt.Close()) require.NoError(t, err) - extracted, err := extractBucketManifest(tmpBoltPath) + extracted, err := backup_restore.ExtractBucketMetadata(tmpBoltPath) require.NoError(t, err) expected := []api.BucketMetadataManifest{ diff --git a/internal/backup_restore/manifest.go b/internal/backup_restore/manifest.go index b084ef8..6017686 100644 --- a/internal/backup_restore/manifest.go +++ b/internal/backup_restore/manifest.go @@ -3,6 +3,8 @@ package backup_restore import ( "fmt" "time" + + "github.com/influxdata/influx-cli/v2/api" ) type FileCompression int @@ -35,9 +37,13 @@ func (c FileCompression) String() string { } } -const ManifestExtension = "manifest" +const ( + ManifestExtension = "manifest" + ManifestVersion = 2 +) type Manifest struct { + Version int `json:"manifestVersion"` KV ManifestFileEntry `json:"kv"` SQL *ManifestFileEntry `json:"sql,omitempty"` Buckets []ManifestBucketEntry `json:"buckets"` @@ -92,3 +98,103 @@ type ManifestSubscription struct { Mode string `json:"mode"` Destinations []string `json:"destinations"` } + +func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestBucketEntry, error) { + m := ManifestBucketEntry{ + OrganizationID: manifest.OrganizationID, + OrganizationName: manifest.OrganizationName, + BucketID: manifest.BucketID, + BucketName: manifest.BucketName, + Description: manifest.Description, + DefaultRetentionPolicy: manifest.DefaultRetentionPolicy, + RetentionPolicies: make([]ManifestRetentionPolicy, len(manifest.RetentionPolicies)), + } + + for i, rp := range manifest.RetentionPolicies { + var err error + m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard) + if err != nil { + return ManifestBucketEntry{}, err + } + } + + return m, nil +} + +func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestRetentionPolicy, error) { + m := ManifestRetentionPolicy{ + Name: manifest.Name, + ReplicaN: manifest.ReplicaN, + Duration: manifest.Duration, + ShardGroupDuration: manifest.ShardGroupDuration, + ShardGroups: make([]ManifestShardGroup, len(manifest.ShardGroups)), + Subscriptions: make([]ManifestSubscription, len(manifest.Subscriptions)), + } + + for i, sg := range manifest.ShardGroups { + var err error + m.ShardGroups[i], err = ConvertShardGroup(sg, getShard) + if err != nil { + return ManifestRetentionPolicy{}, err + } + } + + for i, s := range manifest.Subscriptions { + m.Subscriptions[i] = ManifestSubscription{ + Name: s.Name, + Mode: s.Mode, + Destinations: s.Destinations, + } + } + + return m, nil +} + +func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestShardGroup, error) { + m := ManifestShardGroup{ + ID: manifest.Id, + StartTime: manifest.StartTime, + EndTime: manifest.EndTime, + DeletedAt: manifest.DeletedAt, + TruncatedAt: manifest.TruncatedAt, + Shards: make([]ManifestShardEntry, 0, len(manifest.Shards)), + } + + for _, sh := range manifest.Shards { + maybeShard, err := ConvertShard(sh, getShard) + if err != nil { + return ManifestShardGroup{}, err + } + // Shard deleted mid-backup. + if maybeShard == nil { + continue + } + m.Shards = append(m.Shards, *maybeShard) + } + + return m, nil +} + +func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (*ManifestShardEntry, error) { + shardFileInfo, err := getShard(manifest.Id) + if err != nil { + return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err) + } + if shardFileInfo == nil { + return nil, nil + } + + m := ManifestShardEntry{ + ID: manifest.Id, + ShardOwners: make([]ShardOwnerEntry, len(manifest.ShardOwners)), + ManifestFileEntry: *shardFileInfo, + } + + for i, o := range manifest.ShardOwners { + m.ShardOwners[i] = ShardOwnerEntry{ + NodeID: o.NodeID, + } + } + + return &m, nil +} diff --git a/clients/backup/manifest_test.go b/internal/backup_restore/manifest_test.go similarity index 96% rename from clients/backup/manifest_test.go rename to internal/backup_restore/manifest_test.go index 8b5f45c..c1526bd 100644 --- a/clients/backup/manifest_test.go +++ b/internal/backup_restore/manifest_test.go @@ -1,4 +1,4 @@ -package backup_test +package backup_restore_test import ( "fmt" @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/influx-cli/v2/api" - "github.com/influxdata/influx-cli/v2/clients/backup" br "github.com/influxdata/influx-cli/v2/internal/backup_restore" "github.com/stretchr/testify/require" ) @@ -99,7 +98,7 @@ func TestConvertBucketManifest(t *testing.T) { }, nil } - converted, err := backup.ConvertBucketManifest(manifest, fakeGetShard) + converted, err := br.ConvertBucketManifest(manifest, fakeGetShard) require.NoError(t, err) expected := br.ManifestBucketEntry{ diff --git a/clients/backup/testdata/test.bolt.gz b/internal/backup_restore/testdata/test.bolt.gz similarity index 100% rename from clients/backup/testdata/test.bolt.gz rename to internal/backup_restore/testdata/test.bolt.gz