feat: add logic for reading v2.0.x backup manifests in restore
(#186)
This commit is contained in:
@ -70,6 +70,7 @@ func (c *Client) Backup(ctx context.Context, params *Params) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.baseName = time.Now().UTC().Format(backupFilenamePattern)
|
c.baseName = time.Now().UTC().Format(backupFilenamePattern)
|
||||||
|
c.manifest.Version = br.ManifestVersion
|
||||||
|
|
||||||
// The APIs we use to back up metadata depends on the server's version.
|
// The APIs we use to back up metadata depends on the server's version.
|
||||||
legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi)
|
legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi)
|
||||||
@ -228,7 +229,7 @@ func (c *Client) downloadMetadataLegacy(ctx context.Context, params *Params) err
|
|||||||
|
|
||||||
// Extract the metadata we need from the downloaded KV store, and convert it to a new-style manifest.
|
// Extract the metadata we need from the downloaded KV store, and convert it to a new-style manifest.
|
||||||
log.Println("INFO: Extracting bucket manifest from legacy KV snapshot")
|
log.Println("INFO: Extracting bucket manifest from legacy KV snapshot")
|
||||||
c.bucketMetadata, err = extractBucketManifest(tmpKv)
|
c.bucketMetadata, err = br.ExtractBucketMetadata(tmpKv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to extract bucket metadata from downloaded KV snapshot: %w", err)
|
return fmt.Errorf("failed to extract bucket metadata from downloaded KV snapshot: %w", err)
|
||||||
}
|
}
|
||||||
@ -283,7 +284,7 @@ func (c *Client) downloadBucketData(ctx context.Context, params *Params) error {
|
|||||||
if !params.matches(b) {
|
if !params.matches(b) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) {
|
bktManifest, err := br.ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) {
|
||||||
return c.downloadShardData(ctx, params, shardId)
|
return c.downloadShardData(ctx, params, shardId)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,349 +0,0 @@
|
|||||||
package backup
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
|
||||||
"github.com/influxdata/influx-cli/v2/api"
|
|
||||||
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
|
||||||
|
|
||||||
//go:generate protoc --gogo_out=. internal/meta.proto
|
|
||||||
|
|
||||||
// NOTE: An unfortunate naming collision below. Bolt calls its databases "buckets".
|
|
||||||
// These are the names that were used in the metadata DB for 2.0.x versions of influxdb.
|
|
||||||
var (
|
|
||||||
bucketsBoltBucket = []byte("bucketsv1")
|
|
||||||
organizationsBoltBucket = []byte("organizationsv1")
|
|
||||||
v1MetadataBoltBucket = []byte("v1_tsm1_metadata")
|
|
||||||
v1MetadataBoltKey = []byte("meta.db")
|
|
||||||
)
|
|
||||||
|
|
||||||
// influxdbBucketSchema models the JSON structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// bucket metadata in the embedded KV store.
|
|
||||||
type influxdbBucketSchema struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
OrgID string `json:"orgID"`
|
|
||||||
Type int `json:"type"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
Description *string `json:"description,omitempty"`
|
|
||||||
RetentionPeriod time.Duration `json:"retentionPeriod"`
|
|
||||||
ShardGroupDuration time.Duration `json:"ShardGroupDuration"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbOrganizationSchema models the JSON structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// organization metadata in the embedded KV store.
|
|
||||||
type influxdbOrganizationSchema struct {
|
|
||||||
ID string `json:"id"`
|
|
||||||
Name string `json:"name"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbV1DatabaseInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// database info in the embedded KV store.
|
|
||||||
type influxdbV1DatabaseInfo struct {
|
|
||||||
Name string
|
|
||||||
DefaultRetentionPolicy string
|
|
||||||
RetentionPolicies []influxdbV1RetentionPolicyInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbV1RetentionPolicyInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// retention-policy info in the embedded KV store.
|
|
||||||
type influxdbV1RetentionPolicyInfo struct {
|
|
||||||
Name string
|
|
||||||
ReplicaN int32
|
|
||||||
Duration int64
|
|
||||||
ShardGroupDuration int64
|
|
||||||
ShardGroups []influxdbV1ShardGroupInfo
|
|
||||||
Subscriptions []influxdbV1SubscriptionInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbV1ShardGroupInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// shard-group info in the embedded KV store.
|
|
||||||
type influxdbV1ShardGroupInfo struct {
|
|
||||||
ID int64
|
|
||||||
StartTime time.Time
|
|
||||||
EndTime time.Time
|
|
||||||
DeletedAt time.Time
|
|
||||||
Shards []influxdbV1ShardInfo
|
|
||||||
TruncatedAt time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbV1ShardInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// shard info in the embedded KV store.
|
|
||||||
type influxdbV1ShardInfo struct {
|
|
||||||
ID int64
|
|
||||||
Owners []influxdbV1ShardOwnerInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
// inflxudbV1ShardOwnerInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// shard-owner info in the embedded KV store.
|
|
||||||
type influxdbV1ShardOwnerInfo struct {
|
|
||||||
NodeID int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// influxdbV1SubscriptionInfo models the protobuf structure used by InfluxDB 2.0.x to serialize
|
|
||||||
// subscription info in the embedded KV store.
|
|
||||||
type influxdbV1SubscriptionInfo struct {
|
|
||||||
Name string
|
|
||||||
Mode string
|
|
||||||
Destinations []string
|
|
||||||
}
|
|
||||||
|
|
||||||
// extractBucketManifest reads a boltdb backed up from InfluxDB 2.0.x, converting a subset of the
|
|
||||||
// metadata it contains into a set of 2.1.x bucket manifests.
|
|
||||||
func extractBucketManifest(boltPath string) ([]api.BucketMetadataManifest, error) {
|
|
||||||
db, err := bbolt.Open(boltPath, 0666, &bbolt.Options{ReadOnly: true, Timeout: 1 * time.Second})
|
|
||||||
if err != nil {
|
|
||||||
// Hack to give a slightly nicer error message for a known failure mode when bolt calls
|
|
||||||
// mmap on a file system that doesn't support the MAP_SHARED option.
|
|
||||||
//
|
|
||||||
// See: https://github.com/boltdb/bolt/issues/272
|
|
||||||
// See: https://stackoverflow.com/a/18421071
|
|
||||||
if err.Error() == "invalid argument" {
|
|
||||||
return nil, fmt.Errorf("unable to open boltdb: mmap of %q may not support the MAP_SHARED option", boltPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("unable to open boltdb: %w", err)
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
// Read raw metadata needed to construct a manifest.
|
|
||||||
var buckets []influxdbBucketSchema
|
|
||||||
orgNamesById := map[string]string{}
|
|
||||||
dbInfoByBucketId := map[string]influxdbV1DatabaseInfo{}
|
|
||||||
|
|
||||||
if err := db.View(func(tx *bbolt.Tx) error {
|
|
||||||
bucketDB := tx.Bucket(bucketsBoltBucket)
|
|
||||||
if bucketDB == nil {
|
|
||||||
return errors.New("bucket metadata not found in local KV store")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := bucketDB.ForEach(func(k, v []byte) error {
|
|
||||||
var b influxdbBucketSchema
|
|
||||||
if err := json.Unmarshal(v, &b); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if b.Type != 1 { // 1 == "system"
|
|
||||||
buckets = append(buckets, b)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return fmt.Errorf("failed to read bucket metadata from local KV store: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
orgsDB := tx.Bucket(organizationsBoltBucket)
|
|
||||||
if orgsDB == nil {
|
|
||||||
return errors.New("organization metadata not found in local KV store")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := orgsDB.ForEach(func(k, v []byte) error {
|
|
||||||
var o influxdbOrganizationSchema
|
|
||||||
if err := json.Unmarshal(v, &o); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
orgNamesById[o.ID] = o.Name
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return fmt.Errorf("failed to read organization metadata from local KV store: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
v1DB := tx.Bucket(v1MetadataBoltBucket)
|
|
||||||
if v1DB == nil {
|
|
||||||
return errors.New("v1 database info not found in local KV store")
|
|
||||||
}
|
|
||||||
fullMeta := v1DB.Get(v1MetadataBoltKey)
|
|
||||||
if fullMeta == nil {
|
|
||||||
return errors.New("v1 database info not found in local KV store")
|
|
||||||
}
|
|
||||||
|
|
||||||
var pb br.Data
|
|
||||||
if err := proto.Unmarshal(fullMeta, &pb); err != nil {
|
|
||||||
return fmt.Errorf("failed to unmarshal v1 database info: %w", err)
|
|
||||||
}
|
|
||||||
for _, rawDBI := range pb.GetDatabases() {
|
|
||||||
if rawDBI == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
unmarshalled := unmarshalRawDBI(*rawDBI)
|
|
||||||
dbInfoByBucketId[unmarshalled.Name] = unmarshalled
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
manifests := make([]api.BucketMetadataManifest, len(buckets))
|
|
||||||
for i, b := range buckets {
|
|
||||||
orgName, ok := orgNamesById[b.OrgID]
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("local KV store in inconsistent state: no organization found with ID %q", b.OrgID)
|
|
||||||
}
|
|
||||||
dbi, ok := dbInfoByBucketId[b.ID]
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("local KV store in inconsistent state: no V1 database info found for bucket %q", b.Name)
|
|
||||||
}
|
|
||||||
manifests[i] = combineMetadata(b, orgName, dbi)
|
|
||||||
}
|
|
||||||
|
|
||||||
return manifests, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalRawDBI(rawDBI br.DatabaseInfo) influxdbV1DatabaseInfo {
|
|
||||||
dbi := influxdbV1DatabaseInfo{
|
|
||||||
Name: rawDBI.GetName(),
|
|
||||||
DefaultRetentionPolicy: rawDBI.GetDefaultRetentionPolicy(),
|
|
||||||
RetentionPolicies: make([]influxdbV1RetentionPolicyInfo, 0, len(rawDBI.GetRetentionPolicies())),
|
|
||||||
}
|
|
||||||
for _, rp := range rawDBI.GetRetentionPolicies() {
|
|
||||||
if rp == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dbi.RetentionPolicies = append(dbi.RetentionPolicies, unmarshalRawRPI(*rp))
|
|
||||||
}
|
|
||||||
return dbi
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalRawRPI(rawRPI br.RetentionPolicyInfo) influxdbV1RetentionPolicyInfo {
|
|
||||||
rpi := influxdbV1RetentionPolicyInfo{
|
|
||||||
Name: rawRPI.GetName(),
|
|
||||||
ReplicaN: int32(rawRPI.GetReplicaN()),
|
|
||||||
Duration: rawRPI.GetDuration(),
|
|
||||||
ShardGroupDuration: rawRPI.GetShardGroupDuration(),
|
|
||||||
ShardGroups: make([]influxdbV1ShardGroupInfo, 0, len(rawRPI.GetShardGroups())),
|
|
||||||
Subscriptions: make([]influxdbV1SubscriptionInfo, 0, len(rawRPI.GetSubscriptions())),
|
|
||||||
}
|
|
||||||
for _, sg := range rawRPI.GetShardGroups() {
|
|
||||||
if sg == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rpi.ShardGroups = append(rpi.ShardGroups, unmarshalRawSGI(*sg))
|
|
||||||
}
|
|
||||||
for _, s := range rawRPI.GetSubscriptions() {
|
|
||||||
if s == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rpi.Subscriptions = append(rpi.Subscriptions, influxdbV1SubscriptionInfo{
|
|
||||||
Name: s.GetName(),
|
|
||||||
Mode: s.GetMode(),
|
|
||||||
Destinations: s.GetDestinations(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return rpi
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalRawSGI(rawSGI br.ShardGroupInfo) influxdbV1ShardGroupInfo {
|
|
||||||
sgi := influxdbV1ShardGroupInfo{
|
|
||||||
ID: int64(rawSGI.GetID()),
|
|
||||||
StartTime: time.Unix(0, rawSGI.GetStartTime()).UTC(),
|
|
||||||
EndTime: time.Unix(0, rawSGI.GetEndTime()).UTC(),
|
|
||||||
DeletedAt: time.Unix(0, rawSGI.GetDeletedAt()).UTC(),
|
|
||||||
Shards: make([]influxdbV1ShardInfo, 0, len(rawSGI.GetShards())),
|
|
||||||
TruncatedAt: time.Unix(0, rawSGI.GetTruncatedAt()).UTC(),
|
|
||||||
}
|
|
||||||
for _, s := range rawSGI.GetShards() {
|
|
||||||
if s == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sgi.Shards = append(sgi.Shards, unmarshalRawShard(*s))
|
|
||||||
}
|
|
||||||
return sgi
|
|
||||||
}
|
|
||||||
|
|
||||||
func unmarshalRawShard(rawShard br.ShardInfo) influxdbV1ShardInfo {
|
|
||||||
si := influxdbV1ShardInfo{
|
|
||||||
ID: int64(rawShard.GetID()),
|
|
||||||
}
|
|
||||||
// If deprecated "OwnerIDs" exists then convert it to "Owners" format.
|
|
||||||
//lint:ignore SA1019 we need to check for the presence of the deprecated field so we can convert it
|
|
||||||
oldStyleOwnerIds := rawShard.GetOwnerIDs()
|
|
||||||
if len(oldStyleOwnerIds) > 0 {
|
|
||||||
si.Owners = make([]influxdbV1ShardOwnerInfo, len(oldStyleOwnerIds))
|
|
||||||
for i, oid := range oldStyleOwnerIds {
|
|
||||||
si.Owners[i] = influxdbV1ShardOwnerInfo{NodeID: int64(oid)}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
si.Owners = make([]influxdbV1ShardOwnerInfo, 0, len(rawShard.GetOwners()))
|
|
||||||
for _, o := range rawShard.GetOwners() {
|
|
||||||
if o == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
si.Owners = append(si.Owners, influxdbV1ShardOwnerInfo{NodeID: int64(o.GetNodeID())})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return si
|
|
||||||
}
|
|
||||||
|
|
||||||
func combineMetadata(bucket influxdbBucketSchema, orgName string, dbi influxdbV1DatabaseInfo) api.BucketMetadataManifest {
|
|
||||||
m := api.BucketMetadataManifest{
|
|
||||||
OrganizationID: bucket.OrgID,
|
|
||||||
OrganizationName: orgName,
|
|
||||||
BucketID: bucket.ID,
|
|
||||||
BucketName: bucket.Name,
|
|
||||||
DefaultRetentionPolicy: dbi.DefaultRetentionPolicy,
|
|
||||||
RetentionPolicies: make([]api.RetentionPolicyManifest, len(dbi.RetentionPolicies)),
|
|
||||||
}
|
|
||||||
if bucket.Description != nil && *bucket.Description != "" {
|
|
||||||
m.Description = bucket.Description
|
|
||||||
}
|
|
||||||
for i, rp := range dbi.RetentionPolicies {
|
|
||||||
m.RetentionPolicies[i] = convertRPI(rp)
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func convertRPI(rpi influxdbV1RetentionPolicyInfo) api.RetentionPolicyManifest {
|
|
||||||
m := api.RetentionPolicyManifest{
|
|
||||||
Name: rpi.Name,
|
|
||||||
ReplicaN: rpi.ReplicaN,
|
|
||||||
Duration: rpi.Duration,
|
|
||||||
ShardGroupDuration: rpi.ShardGroupDuration,
|
|
||||||
ShardGroups: make([]api.ShardGroupManifest, len(rpi.ShardGroups)),
|
|
||||||
Subscriptions: make([]api.SubscriptionManifest, len(rpi.Subscriptions)),
|
|
||||||
}
|
|
||||||
for i, sg := range rpi.ShardGroups {
|
|
||||||
m.ShardGroups[i] = convertSGI(sg)
|
|
||||||
}
|
|
||||||
for i, s := range rpi.Subscriptions {
|
|
||||||
m.Subscriptions[i] = api.SubscriptionManifest{
|
|
||||||
Name: s.Name,
|
|
||||||
Mode: s.Mode,
|
|
||||||
Destinations: s.Destinations,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func convertSGI(sgi influxdbV1ShardGroupInfo) api.ShardGroupManifest {
|
|
||||||
m := api.ShardGroupManifest{
|
|
||||||
Id: sgi.ID,
|
|
||||||
StartTime: sgi.StartTime,
|
|
||||||
EndTime: sgi.EndTime,
|
|
||||||
Shards: make([]api.ShardManifest, len(sgi.Shards)),
|
|
||||||
}
|
|
||||||
if sgi.DeletedAt.Unix() != 0 {
|
|
||||||
m.DeletedAt = &sgi.DeletedAt
|
|
||||||
}
|
|
||||||
if sgi.TruncatedAt.Unix() != 0 {
|
|
||||||
m.TruncatedAt = &sgi.TruncatedAt
|
|
||||||
}
|
|
||||||
for i, s := range sgi.Shards {
|
|
||||||
m.Shards[i] = convertShard(s)
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
func convertShard(shard influxdbV1ShardInfo) api.ShardManifest {
|
|
||||||
m := api.ShardManifest{
|
|
||||||
Id: shard.ID,
|
|
||||||
ShardOwners: make([]api.ShardOwner, len(shard.Owners)),
|
|
||||||
}
|
|
||||||
for i, o := range shard.Owners {
|
|
||||||
m.ShardOwners[i] = api.ShardOwner{NodeID: o.NodeID}
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
@ -1,108 +0,0 @@
|
|||||||
package backup
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/influxdata/influx-cli/v2/api"
|
|
||||||
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
|
||||||
)
|
|
||||||
|
|
||||||
func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestBucketEntry, error) {
|
|
||||||
m := br.ManifestBucketEntry{
|
|
||||||
OrganizationID: manifest.OrganizationID,
|
|
||||||
OrganizationName: manifest.OrganizationName,
|
|
||||||
BucketID: manifest.BucketID,
|
|
||||||
BucketName: manifest.BucketName,
|
|
||||||
Description: manifest.Description,
|
|
||||||
DefaultRetentionPolicy: manifest.DefaultRetentionPolicy,
|
|
||||||
RetentionPolicies: make([]br.ManifestRetentionPolicy, len(manifest.RetentionPolicies)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, rp := range manifest.RetentionPolicies {
|
|
||||||
var err error
|
|
||||||
m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard)
|
|
||||||
if err != nil {
|
|
||||||
return br.ManifestBucketEntry{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestRetentionPolicy, error) {
|
|
||||||
m := br.ManifestRetentionPolicy{
|
|
||||||
Name: manifest.Name,
|
|
||||||
ReplicaN: manifest.ReplicaN,
|
|
||||||
Duration: manifest.Duration,
|
|
||||||
ShardGroupDuration: manifest.ShardGroupDuration,
|
|
||||||
ShardGroups: make([]br.ManifestShardGroup, len(manifest.ShardGroups)),
|
|
||||||
Subscriptions: make([]br.ManifestSubscription, len(manifest.Subscriptions)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, sg := range manifest.ShardGroups {
|
|
||||||
var err error
|
|
||||||
m.ShardGroups[i], err = ConvertShardGroup(sg, getShard)
|
|
||||||
if err != nil {
|
|
||||||
return br.ManifestRetentionPolicy{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, s := range manifest.Subscriptions {
|
|
||||||
m.Subscriptions[i] = br.ManifestSubscription{
|
|
||||||
Name: s.Name,
|
|
||||||
Mode: s.Mode,
|
|
||||||
Destinations: s.Destinations,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestShardGroup, error) {
|
|
||||||
m := br.ManifestShardGroup{
|
|
||||||
ID: manifest.Id,
|
|
||||||
StartTime: manifest.StartTime,
|
|
||||||
EndTime: manifest.EndTime,
|
|
||||||
DeletedAt: manifest.DeletedAt,
|
|
||||||
TruncatedAt: manifest.TruncatedAt,
|
|
||||||
Shards: make([]br.ManifestShardEntry, 0, len(manifest.Shards)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, sh := range manifest.Shards {
|
|
||||||
maybeShard, err := ConvertShard(sh, getShard)
|
|
||||||
if err != nil {
|
|
||||||
return br.ManifestShardGroup{}, err
|
|
||||||
}
|
|
||||||
// Shard deleted mid-backup.
|
|
||||||
if maybeShard == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
m.Shards = append(m.Shards, *maybeShard)
|
|
||||||
}
|
|
||||||
|
|
||||||
return m, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (*br.ManifestShardEntry, error) {
|
|
||||||
shardFileInfo, err := getShard(manifest.Id)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err)
|
|
||||||
}
|
|
||||||
if shardFileInfo == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
m := br.ManifestShardEntry{
|
|
||||||
ID: manifest.Id,
|
|
||||||
ShardOwners: make([]br.ShardOwnerEntry, len(manifest.ShardOwners)),
|
|
||||||
ManifestFileEntry: *shardFileInfo,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, o := range manifest.ShardOwners {
|
|
||||||
m.ShardOwners[i] = br.ShardOwnerEntry{
|
|
||||||
NodeID: o.NodeID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return &m, nil
|
|
||||||
}
|
|
@ -1,10 +1,115 @@
|
|||||||
package restore
|
package restore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/influxdata/influx-cli/v2/api"
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// versionSwitch models the subset of fields needed to distinguish different versions of the CLI's backup manifest.
|
||||||
|
type versionSwitch struct {
|
||||||
|
Version int `json:"manifestVersion,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// readManifest parses the manifest file at the given path, converting it to the latest version of our manifest
|
||||||
|
// if needed.
|
||||||
|
func readManifest(path string) (manifest br.Manifest, err error) {
|
||||||
|
var w struct {
|
||||||
|
versionSwitch
|
||||||
|
*br.Manifest
|
||||||
|
*legacyManifest
|
||||||
|
}
|
||||||
|
buf, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return br.Manifest{}, fmt.Errorf("failed to read local manifest at %q: %w", path, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(buf, &w.versionSwitch); err != nil {
|
||||||
|
return br.Manifest{}, fmt.Errorf("failed to check version of local manifest at %q: %w", path, err)
|
||||||
|
}
|
||||||
|
switch w.versionSwitch.Version {
|
||||||
|
case br.ManifestVersion:
|
||||||
|
err = json.Unmarshal(buf, &manifest)
|
||||||
|
case 0: // InfluxDB 2.0.x manifests didn't have a version field.
|
||||||
|
var lm legacyManifest
|
||||||
|
if err := json.Unmarshal(buf, &lm); err != nil {
|
||||||
|
return br.Manifest{}, fmt.Errorf("failed to parse legacy manifest at %q: %w", path, err)
|
||||||
|
}
|
||||||
|
manifest, err = convertManifest(path, lm)
|
||||||
|
default:
|
||||||
|
return br.Manifest{}, fmt.Errorf("unsupported version %d found in manifest at %q", w.versionSwitch.Version, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// convertManifest converts a manifest from the 2.0.x CLI into the latest manifest schema.
|
||||||
|
// NOTE: 2.0.x manifests didn't contain all the info needed by 2.1.x+, so this process requires opening & inspecting
|
||||||
|
// the bolt file referenced by the legacy manifest.
|
||||||
|
func convertManifest(path string, lm legacyManifest) (br.Manifest, error) {
|
||||||
|
// Extract bucket metadata from the local KV snapshot.
|
||||||
|
boltPath := filepath.Join(filepath.Dir(path), lm.KV.FileName)
|
||||||
|
metadata, err := br.ExtractBucketMetadata(boltPath)
|
||||||
|
if err != nil {
|
||||||
|
return br.Manifest{}, err
|
||||||
|
}
|
||||||
|
shardManifestsById := make(map[int64]br.ManifestFileEntry, len(lm.Shards))
|
||||||
|
for _, s := range lm.Shards {
|
||||||
|
shardManifestsById[s.ShardID] = br.ManifestFileEntry{
|
||||||
|
FileName: s.FileName,
|
||||||
|
Size: s.Size,
|
||||||
|
Compression: br.GzipCompression,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m := br.Manifest{
|
||||||
|
Version: br.ManifestVersion,
|
||||||
|
KV: br.ManifestFileEntry{
|
||||||
|
FileName: lm.KV.FileName,
|
||||||
|
Size: lm.KV.Size,
|
||||||
|
Compression: br.NoCompression,
|
||||||
|
},
|
||||||
|
Buckets: make([]br.ManifestBucketEntry, len(metadata)),
|
||||||
|
}
|
||||||
|
for i, bkt := range metadata {
|
||||||
|
m.Buckets[i], err = br.ConvertBucketManifest(bkt, func(shardId int64) (*br.ManifestFileEntry, error) {
|
||||||
|
shardManifest, ok := shardManifestsById[shardId]
|
||||||
|
if !ok {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return &shardManifest, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return br.Manifest{}, fmt.Errorf("failed to parse entry for bucket %q in legacy manifest at %q: %w", bkt.BucketID, path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// legacyManifest models the subset of data stored in 2.0.x CLI backup manifests that is needed for conversion
|
||||||
|
// into the latest manifest format.
|
||||||
|
type legacyManifest struct {
|
||||||
|
KV legacyKV `json:"kv"`
|
||||||
|
Shards []legacyShard `json:"files"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type legacyKV struct {
|
||||||
|
FileName string `json:"fileName"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type legacyShard struct {
|
||||||
|
ShardID int64 `json:"shardID"`
|
||||||
|
FileName string `json:"fileName"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConvertBucketManifest converts a manifest parsed from local disk into a model compatible with the server-side API.
|
||||||
func ConvertBucketManifest(manifest br.ManifestBucketEntry) api.BucketMetadataManifest {
|
func ConvertBucketManifest(manifest br.ManifestBucketEntry) api.BucketMetadataManifest {
|
||||||
m := api.BucketMetadataManifest{
|
m := api.BucketMetadataManifest{
|
||||||
OrganizationID: manifest.OrganizationID,
|
OrganizationID: manifest.OrganizationID,
|
||||||
|
@ -110,11 +110,9 @@ func (c *Client) loadManifests(path string) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var manifest br.Manifest
|
manifest, err := readManifest(manifestFile)
|
||||||
if buf, err := os.ReadFile(manifestFile); err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to read local manifest at %q: %w", manifestFile, err)
|
return err
|
||||||
} else if err := json.Unmarshal(buf, &manifest); err != nil {
|
|
||||||
return fmt.Errorf("failed to parse manifest at %q: %w", manifestFile, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep the latest KV and SQL overall.
|
// Keep the latest KV and SQL overall.
|
||||||
@ -123,7 +121,10 @@ func (c *Client) loadManifests(path string) error {
|
|||||||
|
|
||||||
// Keep the latest manifest per-bucket.
|
// Keep the latest manifest per-bucket.
|
||||||
for _, bkt := range manifest.Buckets {
|
for _, bkt := range manifest.Buckets {
|
||||||
bucketManifests[bkt.BucketID] = bkt
|
// NOTE: Deduplicate here by keeping only the latest entry for each `<org-name>/<bucket-name>` pair.
|
||||||
|
// This prevents "bucket already exists" errors during the restore when the backup manifests contain
|
||||||
|
// entries for multiple buckets with the same name (which can happen when a bucket is deleted & re-created).
|
||||||
|
bucketManifests[fmt.Sprintf("%s/%s", bkt.OrganizationName, bkt.BucketName)] = bkt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
214
internal/backup_restore/bolt.go
Normal file
214
internal/backup_restore/bolt.go
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
package backup_restore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gogo/protobuf/proto"
|
||||||
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
|
"go.etcd.io/bbolt"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:generate protoc --gogo_out=. meta.proto
|
||||||
|
|
||||||
|
// NOTE: An unfortunate naming collision below. Bolt calls its databases "buckets".
|
||||||
|
// These are the names that were used in the metadata DB for 2.0.x versions of influxdb.
|
||||||
|
var (
|
||||||
|
bucketsBoltBucket = []byte("bucketsv1")
|
||||||
|
organizationsBoltBucket = []byte("organizationsv1")
|
||||||
|
v1MetadataBoltBucket = []byte("v1_tsm1_metadata")
|
||||||
|
v1MetadataBoltKey = []byte("meta.db")
|
||||||
|
)
|
||||||
|
|
||||||
|
// influxdbBucketSchema models the JSON structure used by InfluxDB 2.0.x to serialize
|
||||||
|
// bucket metadata in the embedded KV store.
|
||||||
|
type influxdbBucketSchema struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
OrgID string `json:"orgID"`
|
||||||
|
Type int `json:"type"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Description *string `json:"description,omitempty"`
|
||||||
|
RetentionPeriod time.Duration `json:"retentionPeriod"`
|
||||||
|
ShardGroupDuration time.Duration `json:"ShardGroupDuration"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// influxdbOrganizationSchema models the JSON structure used by InfluxDB 2.0.x to serialize
|
||||||
|
// organization metadata in the embedded KV store.
|
||||||
|
type influxdbOrganizationSchema struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExtractBucketMetadata reads a boltdb backed up from InfluxDB 2.0.x, converting a subset of the
|
||||||
|
// metadata it contains into a set of 2.1.x bucket manifests.
|
||||||
|
func ExtractBucketMetadata(boltPath string) ([]api.BucketMetadataManifest, error) {
|
||||||
|
db, err := bbolt.Open(boltPath, 0666, &bbolt.Options{ReadOnly: true, Timeout: 1 * time.Second})
|
||||||
|
if err != nil {
|
||||||
|
// Hack to give a slightly nicer error message for a known failure mode when bolt calls
|
||||||
|
// mmap on a file system that doesn't support the MAP_SHARED option.
|
||||||
|
//
|
||||||
|
// See: https://github.com/boltdb/bolt/issues/272
|
||||||
|
// See: https://stackoverflow.com/a/18421071
|
||||||
|
if err.Error() == "invalid argument" {
|
||||||
|
return nil, fmt.Errorf("unable to open boltdb: mmap of %q may not support the MAP_SHARED option", boltPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("unable to open boltdb: %w", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Read raw metadata needed to construct a manifest.
|
||||||
|
var buckets []influxdbBucketSchema
|
||||||
|
orgNamesById := map[string]string{}
|
||||||
|
dbInfoByBucketId := map[string]DatabaseInfo{}
|
||||||
|
|
||||||
|
if err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
bucketDB := tx.Bucket(bucketsBoltBucket)
|
||||||
|
if bucketDB == nil {
|
||||||
|
return errors.New("bucket metadata not found in local KV store")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bucketDB.ForEach(func(k, v []byte) error {
|
||||||
|
var b influxdbBucketSchema
|
||||||
|
if err := json.Unmarshal(v, &b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if b.Type != 1 { // 1 == "system"
|
||||||
|
buckets = append(buckets, b)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to read bucket metadata from local KV store: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
orgsDB := tx.Bucket(organizationsBoltBucket)
|
||||||
|
if orgsDB == nil {
|
||||||
|
return errors.New("organization metadata not found in local KV store")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := orgsDB.ForEach(func(k, v []byte) error {
|
||||||
|
var o influxdbOrganizationSchema
|
||||||
|
if err := json.Unmarshal(v, &o); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
orgNamesById[o.ID] = o.Name
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to read organization metadata from local KV store: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
v1DB := tx.Bucket(v1MetadataBoltBucket)
|
||||||
|
if v1DB == nil {
|
||||||
|
return errors.New("v1 database info not found in local KV store")
|
||||||
|
}
|
||||||
|
fullMeta := v1DB.Get(v1MetadataBoltKey)
|
||||||
|
if fullMeta == nil {
|
||||||
|
return errors.New("v1 database info not found in local KV store")
|
||||||
|
}
|
||||||
|
|
||||||
|
var pb Data
|
||||||
|
if err := proto.Unmarshal(fullMeta, &pb); err != nil {
|
||||||
|
return fmt.Errorf("failed to unmarshal v1 database info: %w", err)
|
||||||
|
}
|
||||||
|
for _, rawDBI := range pb.GetDatabases() {
|
||||||
|
dbInfoByBucketId[*rawDBI.Name] = *rawDBI
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
manifests := make([]api.BucketMetadataManifest, len(buckets))
|
||||||
|
for i, b := range buckets {
|
||||||
|
orgName, ok := orgNamesById[b.OrgID]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("local KV store in inconsistent state: no organization found with ID %q", b.OrgID)
|
||||||
|
}
|
||||||
|
dbi, ok := dbInfoByBucketId[b.ID]
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("local KV store in inconsistent state: no V1 database info found for bucket %q", b.Name)
|
||||||
|
}
|
||||||
|
manifests[i] = combineMetadata(b, orgName, dbi)
|
||||||
|
}
|
||||||
|
|
||||||
|
return manifests, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func combineMetadata(bucket influxdbBucketSchema, orgName string, dbi DatabaseInfo) api.BucketMetadataManifest {
|
||||||
|
m := api.BucketMetadataManifest{
|
||||||
|
OrganizationID: bucket.OrgID,
|
||||||
|
OrganizationName: orgName,
|
||||||
|
BucketID: bucket.ID,
|
||||||
|
BucketName: bucket.Name,
|
||||||
|
DefaultRetentionPolicy: *dbi.DefaultRetentionPolicy,
|
||||||
|
RetentionPolicies: make([]api.RetentionPolicyManifest, len(dbi.RetentionPolicies)),
|
||||||
|
}
|
||||||
|
if bucket.Description != nil && *bucket.Description != "" {
|
||||||
|
m.Description = bucket.Description
|
||||||
|
}
|
||||||
|
for i, rp := range dbi.RetentionPolicies {
|
||||||
|
m.RetentionPolicies[i] = convertRPI(*rp)
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertRPI(rpi RetentionPolicyInfo) api.RetentionPolicyManifest {
|
||||||
|
m := api.RetentionPolicyManifest{
|
||||||
|
Name: *rpi.Name,
|
||||||
|
ReplicaN: int32(*rpi.ReplicaN),
|
||||||
|
Duration: *rpi.Duration,
|
||||||
|
ShardGroupDuration: *rpi.ShardGroupDuration,
|
||||||
|
ShardGroups: make([]api.ShardGroupManifest, len(rpi.ShardGroups)),
|
||||||
|
Subscriptions: make([]api.SubscriptionManifest, len(rpi.Subscriptions)),
|
||||||
|
}
|
||||||
|
for i, sg := range rpi.ShardGroups {
|
||||||
|
m.ShardGroups[i] = convertSGI(*sg)
|
||||||
|
}
|
||||||
|
for i, s := range rpi.Subscriptions {
|
||||||
|
m.Subscriptions[i] = api.SubscriptionManifest{
|
||||||
|
Name: *s.Name,
|
||||||
|
Mode: *s.Mode,
|
||||||
|
Destinations: s.Destinations,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertSGI(sgi ShardGroupInfo) api.ShardGroupManifest {
|
||||||
|
var deleted, truncated *time.Time
|
||||||
|
if sgi.DeletedAt != nil {
|
||||||
|
d := time.Unix(0, *sgi.DeletedAt).UTC()
|
||||||
|
deleted = &d
|
||||||
|
}
|
||||||
|
if sgi.TruncatedAt != nil {
|
||||||
|
t := time.Unix(0, *sgi.TruncatedAt).UTC()
|
||||||
|
truncated = &t
|
||||||
|
}
|
||||||
|
|
||||||
|
m := api.ShardGroupManifest{
|
||||||
|
Id: int64(*sgi.ID),
|
||||||
|
StartTime: time.Unix(0, *sgi.StartTime).UTC(),
|
||||||
|
EndTime: time.Unix(0, *sgi.EndTime).UTC(),
|
||||||
|
DeletedAt: deleted,
|
||||||
|
TruncatedAt: truncated,
|
||||||
|
Shards: make([]api.ShardManifest, len(sgi.Shards)),
|
||||||
|
}
|
||||||
|
for i, s := range sgi.Shards {
|
||||||
|
m.Shards[i] = convertShard(*s)
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func convertShard(shard ShardInfo) api.ShardManifest {
|
||||||
|
m := api.ShardManifest{
|
||||||
|
Id: int64(*shard.ID),
|
||||||
|
ShardOwners: make([]api.ShardOwner, len(shard.Owners)),
|
||||||
|
}
|
||||||
|
for i, o := range shard.Owners {
|
||||||
|
m.ShardOwners[i] = api.ShardOwner{NodeID: int64(*o.NodeID)}
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package backup
|
package backup_restore_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
@ -9,6 +9,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/influxdata/influx-cli/v2/api"
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
|
"github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,7 +42,7 @@ func TestExtractManifest(t *testing.T) {
|
|||||||
require.NoError(t, tmpBolt.Close())
|
require.NoError(t, tmpBolt.Close())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
extracted, err := extractBucketManifest(tmpBoltPath)
|
extracted, err := backup_restore.ExtractBucketMetadata(tmpBoltPath)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
expected := []api.BucketMetadataManifest{
|
expected := []api.BucketMetadataManifest{
|
@ -3,6 +3,8 @@ package backup_restore
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileCompression int
|
type FileCompression int
|
||||||
@ -35,9 +37,13 @@ func (c FileCompression) String() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const ManifestExtension = "manifest"
|
const (
|
||||||
|
ManifestExtension = "manifest"
|
||||||
|
ManifestVersion = 2
|
||||||
|
)
|
||||||
|
|
||||||
type Manifest struct {
|
type Manifest struct {
|
||||||
|
Version int `json:"manifestVersion"`
|
||||||
KV ManifestFileEntry `json:"kv"`
|
KV ManifestFileEntry `json:"kv"`
|
||||||
SQL *ManifestFileEntry `json:"sql,omitempty"`
|
SQL *ManifestFileEntry `json:"sql,omitempty"`
|
||||||
Buckets []ManifestBucketEntry `json:"buckets"`
|
Buckets []ManifestBucketEntry `json:"buckets"`
|
||||||
@ -92,3 +98,103 @@ type ManifestSubscription struct {
|
|||||||
Mode string `json:"mode"`
|
Mode string `json:"mode"`
|
||||||
Destinations []string `json:"destinations"`
|
Destinations []string `json:"destinations"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestBucketEntry, error) {
|
||||||
|
m := ManifestBucketEntry{
|
||||||
|
OrganizationID: manifest.OrganizationID,
|
||||||
|
OrganizationName: manifest.OrganizationName,
|
||||||
|
BucketID: manifest.BucketID,
|
||||||
|
BucketName: manifest.BucketName,
|
||||||
|
Description: manifest.Description,
|
||||||
|
DefaultRetentionPolicy: manifest.DefaultRetentionPolicy,
|
||||||
|
RetentionPolicies: make([]ManifestRetentionPolicy, len(manifest.RetentionPolicies)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, rp := range manifest.RetentionPolicies {
|
||||||
|
var err error
|
||||||
|
m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard)
|
||||||
|
if err != nil {
|
||||||
|
return ManifestBucketEntry{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestRetentionPolicy, error) {
|
||||||
|
m := ManifestRetentionPolicy{
|
||||||
|
Name: manifest.Name,
|
||||||
|
ReplicaN: manifest.ReplicaN,
|
||||||
|
Duration: manifest.Duration,
|
||||||
|
ShardGroupDuration: manifest.ShardGroupDuration,
|
||||||
|
ShardGroups: make([]ManifestShardGroup, len(manifest.ShardGroups)),
|
||||||
|
Subscriptions: make([]ManifestSubscription, len(manifest.Subscriptions)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, sg := range manifest.ShardGroups {
|
||||||
|
var err error
|
||||||
|
m.ShardGroups[i], err = ConvertShardGroup(sg, getShard)
|
||||||
|
if err != nil {
|
||||||
|
return ManifestRetentionPolicy{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, s := range manifest.Subscriptions {
|
||||||
|
m.Subscriptions[i] = ManifestSubscription{
|
||||||
|
Name: s.Name,
|
||||||
|
Mode: s.Mode,
|
||||||
|
Destinations: s.Destinations,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestShardGroup, error) {
|
||||||
|
m := ManifestShardGroup{
|
||||||
|
ID: manifest.Id,
|
||||||
|
StartTime: manifest.StartTime,
|
||||||
|
EndTime: manifest.EndTime,
|
||||||
|
DeletedAt: manifest.DeletedAt,
|
||||||
|
TruncatedAt: manifest.TruncatedAt,
|
||||||
|
Shards: make([]ManifestShardEntry, 0, len(manifest.Shards)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, sh := range manifest.Shards {
|
||||||
|
maybeShard, err := ConvertShard(sh, getShard)
|
||||||
|
if err != nil {
|
||||||
|
return ManifestShardGroup{}, err
|
||||||
|
}
|
||||||
|
// Shard deleted mid-backup.
|
||||||
|
if maybeShard == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
m.Shards = append(m.Shards, *maybeShard)
|
||||||
|
}
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (*ManifestShardEntry, error) {
|
||||||
|
shardFileInfo, err := getShard(manifest.Id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err)
|
||||||
|
}
|
||||||
|
if shardFileInfo == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
m := ManifestShardEntry{
|
||||||
|
ID: manifest.Id,
|
||||||
|
ShardOwners: make([]ShardOwnerEntry, len(manifest.ShardOwners)),
|
||||||
|
ManifestFileEntry: *shardFileInfo,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, o := range manifest.ShardOwners {
|
||||||
|
m.ShardOwners[i] = ShardOwnerEntry{
|
||||||
|
NodeID: o.NodeID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &m, nil
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package backup_test
|
package backup_restore_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -6,7 +6,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/influx-cli/v2/api"
|
"github.com/influxdata/influx-cli/v2/api"
|
||||||
"github.com/influxdata/influx-cli/v2/clients/backup"
|
|
||||||
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
@ -99,7 +98,7 @@ func TestConvertBucketManifest(t *testing.T) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
converted, err := backup.ConvertBucketManifest(manifest, fakeGetShard)
|
converted, err := br.ConvertBucketManifest(manifest, fakeGetShard)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
expected := br.ManifestBucketEntry{
|
expected := br.ManifestBucketEntry{
|
Reference in New Issue
Block a user