diff --git a/clients/backup/backup.go b/clients/backup/backup.go index 2db32da..a37b7d1 100644 --- a/clients/backup/backup.go +++ b/clients/backup/backup.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/clients" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" ) type Client struct { @@ -24,7 +25,7 @@ type Client struct { // Local state tracked across steps in the backup process. baseName string bucketMetadata []api.BucketMetadataManifest - manifest Manifest + manifest br.Manifest } type Params struct { @@ -42,7 +43,7 @@ type Params struct { Path string // Compression to use for local copies of snapshot files. - Compression FileCompression + Compression br.FileCompression } func (p *Params) matches(bkt api.BucketMetadataManifest) bool { @@ -104,9 +105,9 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { } defer body.Close() - writeFile := func(from io.Reader, to string) (ManifestFileEntry, error) { + writeFile := func(from io.Reader, to string) (br.ManifestFileEntry, error) { toPath := filepath.Join(params.Path, to) - if params.Compression == GzipCompression { + if params.Compression == br.GzipCompression { toPath = toPath + ".gz" } @@ -120,7 +121,7 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { defer out.Close() var outW io.Writer = out - if params.Compression == GzipCompression { + if params.Compression == br.GzipCompression { gw := gzip.NewWriter(out) defer gw.Close() outW = gw @@ -129,14 +130,14 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { _, err = io.Copy(outW, from) return err }(); err != nil { - return ManifestFileEntry{}, err + return br.ManifestFileEntry{}, err } fi, err := os.Stat(toPath) if err != nil { - return ManifestFileEntry{}, err + return br.ManifestFileEntry{}, err } - return ManifestFileEntry{ + return br.ManifestFileEntry{ FileName: fi.Name(), Size: fi.Size(), Compression: params.Compression, @@ -185,12 +186,12 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { // // Bucket metadata must be pre-seeded via downloadMetadata before this method is called. func (c *Client) downloadBucketData(ctx context.Context, params *Params) error { - c.manifest.Buckets = make([]ManifestBucketEntry, 0, len(c.bucketMetadata)) + c.manifest.Buckets = make([]br.ManifestBucketEntry, 0, len(c.bucketMetadata)) for _, b := range c.bucketMetadata { if !params.matches(b) { continue } - bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*ManifestFileEntry, error) { + bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) { return c.downloadShardData(ctx, params, shardId) }) if err != nil { @@ -203,7 +204,7 @@ func (c *Client) downloadBucketData(ctx context.Context, params *Params) error { // downloadShardData downloads the TSM snapshot for a single shard. The snapshot is written // to a local file, and its metadata is returned for aggregation. -func (c Client) downloadShardData(ctx context.Context, params *Params, shardId int64) (*ManifestFileEntry, error) { +func (c Client) downloadShardData(ctx context.Context, params *Params, shardId int64) (*br.ManifestFileEntry, error) { log.Printf("INFO: Backing up TSM for shard %d", shardId) res, err := c.GetBackupShardId(ctx, shardId).AcceptEncoding("gzip").Execute() if err != nil { @@ -218,7 +219,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i defer res.Body.Close() fileName := fmt.Sprintf("%s.%d.tar", c.baseName, shardId) - if params.Compression == GzipCompression { + if params.Compression == br.GzipCompression { fileName = fileName + ".gz" } path := filepath.Join(params.Path, fileName) @@ -236,12 +237,12 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i var outW io.Writer = f // Make sure the locally-written data is compressed according to the user's request. - if params.Compression == GzipCompression && res.Header.Get("Content-Encoding") != "gzip" { + if params.Compression == br.GzipCompression && res.Header.Get("Content-Encoding") != "gzip" { gzw := gzip.NewWriter(outW) defer gzw.Close() outW = gzw } - if params.Compression == NoCompression && res.Header.Get("Content-Encoding") == "gzip" { + if params.Compression == br.NoCompression && res.Header.Get("Content-Encoding") == "gzip" { gzr, err := gzip.NewReader(inR) if err != nil { return err @@ -260,7 +261,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i if err != nil { return nil, err } - return &ManifestFileEntry{ + return &br.ManifestFileEntry{ FileName: fi.Name(), Size: fi.Size(), Compression: params.Compression, @@ -270,7 +271,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i // writeManifest writes a description of all files downloaded as part of the backup process // to the backup folder, encoded as JSON. func (c Client) writeManifest(params *Params) error { - manifestPath := filepath.Join(params.Path, fmt.Sprintf("%s.manifest", c.baseName)) + manifestPath := filepath.Join(params.Path, fmt.Sprintf("%s.%s", c.baseName, br.ManifestExtension)) f, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return err diff --git a/clients/backup/backup_internal_test.go b/clients/backup/backup_internal_test.go index c73dfc7..643a2fa 100644 --- a/clients/backup/backup_internal_test.go +++ b/clients/backup/backup_internal_test.go @@ -18,6 +18,7 @@ import ( "github.com/golang/mock/gomock" "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/clients" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" "github.com/influxdata/influx-cli/v2/internal/mock" "github.com/stretchr/testify/require" ) @@ -55,28 +56,28 @@ func TestBackup_DownloadMetadata(t *testing.T) { testCases := []struct { name string - compression FileCompression - responseCompression FileCompression + compression br.FileCompression + responseCompression br.FileCompression }{ { name: "no gzip", - compression: NoCompression, - responseCompression: NoCompression, + compression: br.NoCompression, + responseCompression: br.NoCompression, }, { name: "response gzip, no local gzip", - compression: NoCompression, - responseCompression: GzipCompression, + compression: br.NoCompression, + responseCompression: br.GzipCompression, }, { name: "no response gzip, local gzip", - compression: GzipCompression, - responseCompression: NoCompression, + compression: br.GzipCompression, + responseCompression: br.NoCompression, }, { name: "all gzip", - compression: GzipCompression, - responseCompression: GzipCompression, + compression: br.GzipCompression, + responseCompression: br.GzipCompression, }, } @@ -94,7 +95,7 @@ func TestBackup_DownloadMetadata(t *testing.T) { DoAndReturn(func(request api.ApiGetBackupMetadataRequest) (*http.Response, error) { out := bytes.Buffer{} var outW io.Writer = &out - if tc.responseCompression == GzipCompression { + if tc.responseCompression == br.GzipCompression { gzw := gzip.NewWriter(outW) defer gzw.Close() outW = gzw @@ -144,7 +145,7 @@ func TestBackup_DownloadMetadata(t *testing.T) { res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)} res.Header.Add("Content-Type", fmt.Sprintf("multipart/mixed; boundary=%s", writer.Boundary())) - if tc.responseCompression == GzipCompression { + if tc.responseCompression == br.GzipCompression { res.Header.Add("Content-Encoding", "gzip") } return &res, nil @@ -173,7 +174,7 @@ func TestBackup_DownloadMetadata(t *testing.T) { defer localKv.Close() var kvReader io.Reader = localKv - if tc.compression == GzipCompression { + if tc.compression == br.GzipCompression { gzr, err := gzip.NewReader(kvReader) require.NoError(t, err) defer gzr.Close() @@ -188,7 +189,7 @@ func TestBackup_DownloadMetadata(t *testing.T) { defer localSql.Close() var sqlReader io.Reader = localSql - if tc.compression == GzipCompression { + if tc.compression == br.GzipCompression { gzr, err := gzip.NewReader(sqlReader) require.NoError(t, err) defer gzr.Close() @@ -208,28 +209,28 @@ func TestBackup_DownloadShardData(t *testing.T) { testCases := []struct { name string - compression FileCompression - responseCompression FileCompression + compression br.FileCompression + responseCompression br.FileCompression }{ { name: "no gzip", - compression: NoCompression, - responseCompression: NoCompression, + compression: br.NoCompression, + responseCompression: br.NoCompression, }, { name: "response gzip, no local gzip", - compression: NoCompression, - responseCompression: GzipCompression, + compression: br.NoCompression, + responseCompression: br.GzipCompression, }, { name: "no response gzip, local gzip", - compression: GzipCompression, - responseCompression: NoCompression, + compression: br.GzipCompression, + responseCompression: br.NoCompression, }, { name: "all gzip", - compression: GzipCompression, - responseCompression: GzipCompression, + compression: br.GzipCompression, + responseCompression: br.GzipCompression, }, } @@ -247,7 +248,7 @@ func TestBackup_DownloadShardData(t *testing.T) { DoAndReturn(func(api.ApiGetBackupShardIdRequest) (*http.Response, error) { out := bytes.Buffer{} var outW io.Writer = &out - if tc.responseCompression == GzipCompression { + if tc.responseCompression == br.GzipCompression { gzw := gzip.NewWriter(outW) defer gzw.Close() outW = gzw @@ -256,7 +257,7 @@ func TestBackup_DownloadShardData(t *testing.T) { require.NoError(t, err) res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)} res.Header.Add("Content-Type", "application/octet-stream") - if tc.responseCompression == GzipCompression { + if tc.responseCompression == br.GzipCompression { res.Header.Add("Content-Encoding", "gzip") } return &res, nil @@ -285,7 +286,7 @@ func TestBackup_DownloadShardData(t *testing.T) { defer localShard.Close() var shardReader io.Reader = localShard - if tc.compression == GzipCompression { + if tc.compression == br.GzipCompression { gzr, err := gzip.NewReader(shardReader) require.NoError(t, err) defer gzr.Close() diff --git a/clients/backup/manifest.go b/clients/backup/manifest.go index 28c3b1b..fa8aab2 100644 --- a/clients/backup/manifest.go +++ b/clients/backup/manifest.go @@ -2,94 +2,53 @@ package backup import ( "fmt" - "time" "github.com/influxdata/influx-cli/v2/api" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" ) -type FileCompression int - -const ( - NoCompression FileCompression = iota - GzipCompression -) - -func (c *FileCompression) Set(v string) error { - switch v { - case "none": - *c = NoCompression - case "gzip": - *c = GzipCompression - default: - return fmt.Errorf("unsupported format: %q", v) - } - return nil -} - -func (c FileCompression) String() string { - switch c { - case NoCompression: - return "none" - case GzipCompression: - return "gzip" - default: - panic("Impossible!") - } -} - -type Manifest struct { - KV ManifestFileEntry `json:"kv"` - SQL ManifestFileEntry `json:"sql"` - Buckets []ManifestBucketEntry `json:"buckets"` -} - -type ManifestFileEntry struct { - FileName string `json:"fileName"` - Size int64 `json:"size"` - Compression FileCompression `json:"compression"` -} - -func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestBucketEntry, error) { - m := ManifestBucketEntry{ +func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestBucketEntry, error) { + m := br.ManifestBucketEntry{ OrganizationID: manifest.OrganizationID, OrganizationName: manifest.OrganizationName, BucketID: manifest.BucketID, BucketName: manifest.BucketName, + Description: manifest.Description, DefaultRetentionPolicy: manifest.DefaultRetentionPolicy, - RetentionPolicies: make([]ManifestRetentionPolicy, len(manifest.RetentionPolicies)), + RetentionPolicies: make([]br.ManifestRetentionPolicy, len(manifest.RetentionPolicies)), } for i, rp := range manifest.RetentionPolicies { var err error m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard) if err != nil { - return ManifestBucketEntry{}, err + return br.ManifestBucketEntry{}, err } } return m, nil } -func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestRetentionPolicy, error) { - m := ManifestRetentionPolicy{ +func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestRetentionPolicy, error) { + m := br.ManifestRetentionPolicy{ Name: manifest.Name, ReplicaN: manifest.ReplicaN, Duration: manifest.Duration, ShardGroupDuration: manifest.ShardGroupDuration, - ShardGroups: make([]ManifestShardGroup, len(manifest.ShardGroups)), - Subscriptions: make([]ManifestSubscription, len(manifest.Subscriptions)), + ShardGroups: make([]br.ManifestShardGroup, len(manifest.ShardGroups)), + Subscriptions: make([]br.ManifestSubscription, len(manifest.Subscriptions)), } for i, sg := range manifest.ShardGroups { var err error m.ShardGroups[i], err = ConvertShardGroup(sg, getShard) if err != nil { - return ManifestRetentionPolicy{}, err + return br.ManifestRetentionPolicy{}, err } } for i, s := range manifest.Subscriptions { - m.Subscriptions[i] = ManifestSubscription{ + m.Subscriptions[i] = br.ManifestSubscription{ Name: s.Name, Mode: s.Mode, Destinations: s.Destinations, @@ -99,20 +58,20 @@ func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func( return m, nil } -func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestShardGroup, error) { - m := ManifestShardGroup{ +func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestShardGroup, error) { + m := br.ManifestShardGroup{ ID: manifest.Id, StartTime: manifest.StartTime, EndTime: manifest.EndTime, DeletedAt: manifest.DeletedAt, TruncatedAt: manifest.TruncatedAt, - Shards: make([]ManifestShardEntry, 0, len(manifest.Shards)), + Shards: make([]br.ManifestShardEntry, 0, len(manifest.Shards)), } for _, sh := range manifest.Shards { maybeShard, err := ConvertShard(sh, getShard) if err != nil { - return ManifestShardGroup{}, err + return br.ManifestShardGroup{}, err } // Shard deleted mid-backup. if maybeShard == nil { @@ -124,7 +83,7 @@ func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId in return m, nil } -func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (*ManifestShardEntry, error) { +func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (*br.ManifestShardEntry, error) { shardFileInfo, err := getShard(manifest.Id) if err != nil { return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err) @@ -133,60 +92,17 @@ func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*Man return nil, nil } - m := ManifestShardEntry{ + m := br.ManifestShardEntry{ ID: manifest.Id, - ShardOwners: make([]ShardOwner, len(manifest.ShardOwners)), + ShardOwners: make([]br.ShardOwner, len(manifest.ShardOwners)), ManifestFileEntry: *shardFileInfo, } for i, o := range manifest.ShardOwners { - m.ShardOwners[i] = ShardOwner{ + m.ShardOwners[i] = br.ShardOwner{ NodeID: o.NodeID, } } return &m, nil } - -type ManifestBucketEntry struct { - OrganizationID string `json:"organizationID"` - OrganizationName string `json:"organizationName"` - BucketID string `json:"bucketID"` - BucketName string `json:"bucketName"` - DefaultRetentionPolicy string `json:"defaultRetentionPolicy"` - RetentionPolicies []ManifestRetentionPolicy `json:"retentionPolicies"` -} - -type ManifestRetentionPolicy struct { - Name string `json:"name"` - ReplicaN int32 `json:"replicaN"` - Duration int64 `json:"duration"` - ShardGroupDuration int64 `json:"shardGroupDuration"` - ShardGroups []ManifestShardGroup `json:"shardGroups"` - Subscriptions []ManifestSubscription `json:"subscriptions"` -} - -type ManifestShardGroup struct { - ID int64 `json:"id"` - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - DeletedAt *time.Time `json:"deletedAt,omitempty"` - TruncatedAt *time.Time `json:"truncatedAt,omitempty"` - Shards []ManifestShardEntry `json:"shards"` -} - -type ManifestShardEntry struct { - ID int64 `json:"id"` - ShardOwners []ShardOwner `json:"shardOwners"` - ManifestFileEntry -} - -type ShardOwner struct { - NodeID int64 `json:"nodeID"` -} - -type ManifestSubscription struct { - Name string `json:"name"` - Mode string `json:"mode"` - Destinations []string `json:"destinations"` -} diff --git a/clients/backup/manifest_test.go b/clients/backup/manifest_test.go index fc02d1b..f188fdf 100644 --- a/clients/backup/manifest_test.go +++ b/clients/backup/manifest_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/clients/backup" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" "github.com/stretchr/testify/require" ) @@ -87,46 +88,46 @@ func TestConvertBucketManifest(t *testing.T) { }, } - fakeGetShard := func(id int64) (*backup.ManifestFileEntry, error) { + fakeGetShard := func(id int64) (*br.ManifestFileEntry, error) { if id == 20 { return nil, nil } - return &backup.ManifestFileEntry{ + return &br.ManifestFileEntry{ FileName: fmt.Sprintf("%d.gz", id), Size: id * 100, - Compression: backup.GzipCompression, + Compression: br.GzipCompression, }, nil } converted, err := backup.ConvertBucketManifest(manifest, fakeGetShard) require.NoError(t, err) - expected := backup.ManifestBucketEntry{ + expected := br.ManifestBucketEntry{ OrganizationID: "123", OrganizationName: "org", BucketID: "456", BucketName: "bucket", DefaultRetentionPolicy: "foo", - RetentionPolicies: []backup.ManifestRetentionPolicy{ + RetentionPolicies: []br.ManifestRetentionPolicy{ { Name: "foo", ReplicaN: 1, Duration: 100, ShardGroupDuration: 10, - ShardGroups: []backup.ManifestShardGroup{ + ShardGroups: []br.ManifestShardGroup{ { ID: 1, StartTime: now, EndTime: now, TruncatedAt: &now, - Shards: []backup.ManifestShardEntry{ + Shards: []br.ManifestShardEntry{ { ID: 10, - ShardOwners: []backup.ShardOwner{{NodeID: 1}}, - ManifestFileEntry: backup.ManifestFileEntry{ + ShardOwners: []br.ShardOwner{{NodeID: 1}}, + ManifestFileEntry: br.ManifestFileEntry{ FileName: "10.gz", Size: 1000, - Compression: backup.GzipCompression, + Compression: br.GzipCompression, }, }, }, @@ -136,35 +137,35 @@ func TestConvertBucketManifest(t *testing.T) { StartTime: now, EndTime: now, DeletedAt: &now, - Shards: []backup.ManifestShardEntry{ + Shards: []br.ManifestShardEntry{ { ID: 30, - ShardOwners: []backup.ShardOwner{}, - ManifestFileEntry: backup.ManifestFileEntry{ + ShardOwners: []br.ShardOwner{}, + ManifestFileEntry: br.ManifestFileEntry{ FileName: "30.gz", Size: 3000, - Compression: backup.GzipCompression, + Compression: br.GzipCompression, }, }, }, }, }, - Subscriptions: []backup.ManifestSubscription{}, + Subscriptions: []br.ManifestSubscription{}, }, { Name: "bar", ReplicaN: 3, Duration: 9999, ShardGroupDuration: 1, - ShardGroups: []backup.ManifestShardGroup{ + ShardGroups: []br.ManifestShardGroup{ { ID: 3, StartTime: now, EndTime: now, - Shards: []backup.ManifestShardEntry{}, + Shards: []br.ManifestShardEntry{}, }, }, - Subscriptions: []backup.ManifestSubscription{ + Subscriptions: []br.ManifestSubscription{ { Name: "test", Mode: "on", diff --git a/clients/restore/manifest.go b/clients/restore/manifest.go new file mode 100644 index 0000000..284ac13 --- /dev/null +++ b/clients/restore/manifest.go @@ -0,0 +1,79 @@ +package restore + +import ( + "github.com/influxdata/influx-cli/v2/api" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" +) + +func ConvertBucketManifest(manifest br.ManifestBucketEntry) api.BucketMetadataManifest { + m := api.BucketMetadataManifest{ + OrganizationID: manifest.OrganizationID, + OrganizationName: manifest.OrganizationName, + BucketID: manifest.BucketID, + BucketName: manifest.BucketName, + Description: manifest.Description, + DefaultRetentionPolicy: manifest.DefaultRetentionPolicy, + RetentionPolicies: make([]api.RetentionPolicyManifest, len(manifest.RetentionPolicies)), + } + + for i, rp := range manifest.RetentionPolicies { + m.RetentionPolicies[i] = ConvertRetentionPolicy(rp) + } + + return m +} + +func ConvertRetentionPolicy(manifest br.ManifestRetentionPolicy) api.RetentionPolicyManifest { + m := api.RetentionPolicyManifest{ + Name: manifest.Name, + ReplicaN: manifest.ReplicaN, + Duration: manifest.Duration, + ShardGroupDuration: manifest.ShardGroupDuration, + ShardGroups: make([]api.ShardGroupManifest, len(manifest.ShardGroups)), + Subscriptions: make([]api.SubscriptionManifest, len(manifest.Subscriptions)), + } + + for i, sg := range manifest.ShardGroups { + m.ShardGroups[i] = ConvertShardGroup(sg) + } + + for i, s := range manifest.Subscriptions { + m.Subscriptions[i] = api.SubscriptionManifest{ + Name: s.Name, + Mode: s.Mode, + Destinations: s.Destinations, + } + } + + return m +} + +func ConvertShardGroup(manifest br.ManifestShardGroup) api.ShardGroupManifest { + m := api.ShardGroupManifest{ + Id: manifest.ID, + StartTime: manifest.StartTime, + EndTime: manifest.EndTime, + DeletedAt: manifest.DeletedAt, + TruncatedAt: manifest.TruncatedAt, + Shards: make([]api.ShardManifest, len(manifest.Shards)), + } + + for i, sh := range manifest.Shards { + m.Shards[i] = ConvertShard(sh) + } + + return m +} + +func ConvertShard(manifest br.ManifestShardEntry) api.ShardManifest { + m := api.ShardManifest{ + Id: manifest.ID, + ShardOwners: make([]api.ShardOwner, len(manifest.ShardOwners)), + } + + for i, so := range manifest.ShardOwners { + m.ShardOwners[i] = api.ShardOwner{NodeID: so.NodeID} + } + + return m +} diff --git a/clients/restore/restore.go b/clients/restore/restore.go new file mode 100644 index 0000000..83a7fbd --- /dev/null +++ b/clients/restore/restore.go @@ -0,0 +1,292 @@ +package restore + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/influxdata/influx-cli/v2/api" + "github.com/influxdata/influx-cli/v2/clients" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" + "github.com/influxdata/influx-cli/v2/pkg/gzip" +) + +type Client struct { + clients.CLI + api.RestoreApi + api.OrganizationsApi + + manifest br.Manifest +} + +type Params struct { + // Path to local backup data created using `influx backup` + Path string + + // Original ID/name of the organization to restore. + // If not set, all orgs will be restored. + OrgID string + Org string + + // New name to use for the restored organization. + // If not set, the org will be restored using its backed-up name. + NewOrgName string + + // Original ID/name of the bucket to restore. + // If not set, all buckets within the org filter will be restored. + BucketID string + Bucket string + + // New name to use for the restored bucket. + // If not set, the bucket will be restored using its backed-up name. + NewBucketName string + + // If true, replace all data on the server with the local backup. + // Otherwise only restore the requested org/bucket, leaving other data untouched. + Full bool +} + +func (p *Params) matches(bkt br.ManifestBucketEntry) bool { + if p.OrgID != "" && bkt.OrganizationID != p.OrgID { + return false + } + if p.Org != "" && bkt.OrganizationName != p.Org { + return false + } + if p.BucketID != "" && bkt.BucketID != p.BucketID { + return false + } + if p.Bucket != "" && bkt.BucketName != p.Bucket { + return false + } + return true +} + +func (c *Client) Restore(ctx context.Context, params *Params) error { + if err := c.loadManifests(params.Path); err != nil { + return err + } + if params.Full { + return c.fullRestore(ctx, params.Path) + } + return c.partialRestore(ctx, params) +} + +// loadManifests finds and merges all backup manifests stored in a given directory, +// keeping the latest top-level metadata and latest metadata per-bucket. +func (c *Client) loadManifests(path string) error { + // Read all manifest files from path, sort in ascending time. + manifests, err := filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", br.ManifestExtension))) + if err != nil { + return fmt.Errorf("failed to find backup manifests at %q: %w", path, err) + } else if len(manifests) == 0 { + return fmt.Errorf("no backup manifests found at %q", path) + } + sort.Strings(manifests) + + bucketManifests := map[string]br.ManifestBucketEntry{} + for _, manifestFile := range manifests { + // Skip file if it is a directory. + if fi, err := os.Stat(manifestFile); err != nil { + return fmt.Errorf("failed to inspect local manifest at %q: %w", manifestFile, err) + } else if fi.IsDir() { + continue + } + + var manifest br.Manifest + if buf, err := os.ReadFile(manifestFile); err != nil { + return fmt.Errorf("failed to read local manifest at %q: %w", manifestFile, err) + } else if err := json.Unmarshal(buf, &manifest); err != nil { + return fmt.Errorf("failed to parse manifest at %q: %w", manifestFile, err) + } + + // Keep the latest KV and SQL overall. + c.manifest.KV = manifest.KV + c.manifest.SQL = manifest.SQL + + // Keep the latest manifest per-bucket. + for _, bkt := range manifest.Buckets { + bucketManifests[bkt.BucketID] = bkt + } + } + + c.manifest.Buckets = make([]br.ManifestBucketEntry, 0, len(bucketManifests)) + for _, bkt := range bucketManifests { + c.manifest.Buckets = append(c.manifest.Buckets, bkt) + } + + return nil +} + +func (c Client) fullRestore(ctx context.Context, path string) error { + // Make sure we can read both local metadata snapshots before + kvBytes, err := c.readFileGzipped(path, c.manifest.KV) + if err != nil { + return fmt.Errorf("failed to open local KV backup at %q: %w", filepath.Join(path, c.manifest.KV.FileName), err) + } + defer kvBytes.Close() + sqlBytes, err := c.readFileGzipped(path, c.manifest.SQL) + if err != nil { + return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err) + } + defer sqlBytes.Close() + + // Upload metadata snapshots to the server. + log.Println("INFO: Restoring KV snapshot") + if err := c.PostRestoreKV(ctx). + ContentEncoding("gzip"). + ContentType("application/octet-stream"). + Body(kvBytes). + Execute(); err != nil { + return fmt.Errorf("failed to restore KV snapshot: %w", err) + } + log.Println("INFO: Restoring SQL snapshot") + if err := c.PostRestoreSQL(ctx). + ContentEncoding("gzip"). + ContentType("application/octet-stream"). + Body(sqlBytes). + Execute(); err != nil { + return fmt.Errorf("failed to restore SQL snapshot: %w", err) + } + + // Drill down through bucket manifests to reach shard info, and upload it. + for _, b := range c.manifest.Buckets { + for _, rp := range b.RetentionPolicies { + for _, sg := range rp.ShardGroups { + for _, s := range sg.Shards { + if err := c.restoreShard(ctx, path, s); err != nil { + return err + } + } + } + } + } + + return nil +} + +func (c Client) partialRestore(ctx context.Context, params *Params) (err error) { + orgIds := map[string]string{} + + for _, bkt := range c.manifest.Buckets { + // Skip internal buckets. + if strings.HasPrefix(bkt.BucketName, "_") { + continue + } + if !params.matches(bkt) { + continue + } + + orgName := bkt.OrganizationName + // Before this method is called, we ensure that new-org-name is only set if + // a filter on org-name or org-id is set. If that check passes and execution + // reaches this code, we can assume that all buckets matching the filter come + // from the same org, so we can swap in the new org name unconditionally. + if params.NewOrgName != "" { + orgName = params.NewOrgName + } + + if _, ok := orgIds[orgName]; !ok { + orgIds[orgName], err = c.restoreOrg(ctx, orgName) + if err != nil { + return + } + } + + // By the same reasoning as above, if new-bucket-name is non-empty we know + // filters must have been set to ensure we only match 1 bucket, so we can + // swap the name without additional checks. + if params.NewBucketName != "" { + bkt.BucketName = params.NewBucketName + } + + log.Printf("INFO: Restoring bucket %q as %q\n", bkt.BucketID, bkt.BucketName) + bucketMapping, err := c.PostRestoreBucketMetadata(ctx). + BucketMetadataManifest(ConvertBucketManifest(bkt)). + Execute() + if err != nil { + return fmt.Errorf("failed to restore bucket %q: %w", bkt.BucketName, err) + } + + shardIdMap := make(map[int64]int64, len(bucketMapping.ShardMappings)) + for _, mapping := range bucketMapping.ShardMappings { + shardIdMap[mapping.OldId] = mapping.NewId + } + + for _, rp := range bkt.RetentionPolicies { + for _, sg := range rp.ShardGroups { + for _, sh := range sg.Shards { + newID, ok := shardIdMap[sh.ID] + if !ok { + log.Printf("WARN: Server didn't map ID for shard %d in bucket %q, skipping\n", sh.ID, bkt.BucketName) + continue + } + sh.ID = newID + if err := c.restoreShard(ctx, params.Path, sh); err != nil { + return err + } + } + } + } + } + + return nil +} + +// restoreOrg gets the ID for the org with the given name, creating the org if it doesn't already exist. +func (c Client) restoreOrg(ctx context.Context, name string) (string, error) { + orgs, err := c.GetOrgs(ctx).Org(name).Execute() + if err != nil { + return "", fmt.Errorf("failed to check existence of organization %q: %w", name, err) + } + + if len(orgs.GetOrgs()) == 0 { + // Create any missing orgs. + newOrg, err := c.PostOrgs(ctx).PostOrganizationRequest(api.PostOrganizationRequest{Name: name}).Execute() + if err != nil { + return "", fmt.Errorf("failed to create organization %q: %w", name, err) + } + return *newOrg.Id, nil + } + + return *orgs.GetOrgs()[0].Id, nil +} + +// readFileGzipped opens a local file and returns a reader of its contents, +// compressed with gzip. +func (c Client) readFileGzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) { + fullPath := filepath.Join(path, file.FileName) + f, err := os.Open(fullPath) + if err != nil { + return nil, err + } + if file.Compression == br.GzipCompression { + return f, nil + } + return gzip.NewGzipPipe(f), nil +} + +func (c Client) restoreShard(ctx context.Context, path string, m br.ManifestShardEntry) error { + // Make sure we can read the local snapshot. + tsmBytes, err := c.readFileGzipped(path, m.ManifestFileEntry) + if err != nil { + return fmt.Errorf("failed to open local TSM snapshot at %q: %w", filepath.Join(path, m.FileName), err) + } + defer tsmBytes.Close() + + log.Printf("INFO: Restoring TSM snapshot for shard %d\n", m.ID) + if err := c.PostRestoreShardId(ctx, fmt.Sprintf("%d", m.ID)). + ContentEncoding("gzip"). + ContentType("application/octet-stream"). + Body(tsmBytes). + Execute(); err != nil { + return fmt.Errorf("failed to restore TSM snapshot for shard %d: %w", m.ID, err) + } + return nil +} diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index 24260b0..67b81a9 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/influxdata/influx-cli/v2/clients/backup" + br "github.com/influxdata/influx-cli/v2/internal/backup_restore" "github.com/influxdata/influx-cli/v2/pkg/cli/middleware" "github.com/urfave/cli/v2" ) @@ -11,7 +12,7 @@ import ( func newBackupCmd() *cli.Command { var params backup.Params // Default to gzipping local files. - params.Compression = backup.GzipCompression + params.Compression = br.GzipCompression return &cli.Command{ Name: "backup", diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 520e974..686f83c 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -45,6 +45,7 @@ var app = cli.App{ newUserCmd(), newTaskCommand(), newBackupCmd(), + newRestoreCmd(), newTelegrafsCommand(), }, } diff --git a/cmd/influx/restore.go b/cmd/influx/restore.go new file mode 100644 index 0000000..d9cfad1 --- /dev/null +++ b/cmd/influx/restore.go @@ -0,0 +1,98 @@ +package main + +import ( + "errors" + + "github.com/influxdata/influx-cli/v2/clients/restore" + "github.com/influxdata/influx-cli/v2/pkg/cli/middleware" + "github.com/urfave/cli/v2" +) + +func newRestoreCmd() *cli.Command { + var params restore.Params + + return &cli.Command{ + Name: "restore", + Usage: "Restores a backup directory to InfluxDB", + Description: `Restore influxdb. + +Examples: + # backup all data + influx restore /path/to/restore +`, + ArgsUsage: "path", + Before: middleware.WithBeforeFns(withCli(), withApi(true)), + Flags: append( + commonFlagsNoPrint(), + &cli.BoolFlag{ + Name: "full", + Usage: "Fully restore and replace all data on server", + Destination: ¶ms.Full, + }, + &cli.StringFlag{ + Name: "org-id", + Usage: "The original ID of the organization to restore", + EnvVars: []string{"INFLUX_ORG_ID"}, + Destination: ¶ms.OrgID, + }, + &cli.StringFlag{ + Name: "org", + Usage: "The original name of the organization to restore", + Aliases: []string{"o"}, + EnvVars: []string{"INFLUX_ORG"}, + Destination: ¶ms.Org, + }, + &cli.StringFlag{ + Name: "bucket-id", + Usage: "The original ID of the bucket to restore", + Destination: ¶ms.BucketID, + }, + &cli.StringFlag{ + Name: "bucket", + Usage: "The original name of the bucket to restore", + Aliases: []string{"b"}, + Destination: ¶ms.Bucket, + }, + &cli.StringFlag{ + Name: "new-bucket", + Usage: "New name to use for the restored bucket", + Destination: ¶ms.NewBucketName, + }, + &cli.StringFlag{ + Name: "new-org", + Usage: "New name to use for the restored organization", + Destination: ¶ms.NewOrgName, + }, + ), + Action: func(ctx *cli.Context) error { + if ctx.NArg() != 1 { + return errors.New("restore path must be specified as a single positional argument") + } + params.Path = ctx.Args().Get(0) + + if params.Full && (params.Org != "" || + params.OrgID != "" || + params.Bucket != "" || + params.BucketID != "" || + params.NewOrgName != "" || + params.NewBucketName != "") { + return errors.New("--full restore cannot be limited to a single org or bucket") + } + + if params.NewOrgName != "" && params.OrgID == "" && params.Org == "" { + return errors.New("--org-id or --org must be set to use --new-org") + } + if params.NewBucketName != "" && params.BucketID == "" && params.Bucket == "" { + return errors.New("--bucket-id or --bucket must be set to use --new-bucket") + } + + api := getAPI(ctx) + client := restore.Client{ + CLI: getCLI(ctx), + RestoreApi: api.RestoreApi, + OrganizationsApi: api.OrganizationsApi, + } + return client.Restore(ctx.Context, ¶ms) + }, + } +} diff --git a/internal/backup_restore/manifest.go b/internal/backup_restore/manifest.go new file mode 100644 index 0000000..876e545 --- /dev/null +++ b/internal/backup_restore/manifest.go @@ -0,0 +1,94 @@ +package backup_restore + +import ( + "fmt" + "time" +) + +type FileCompression int + +const ( + NoCompression FileCompression = iota + GzipCompression +) + +func (c *FileCompression) Set(v string) error { + switch v { + case "none": + *c = NoCompression + case "gzip": + *c = GzipCompression + default: + return fmt.Errorf("unsupported format: %q", v) + } + return nil +} + +func (c FileCompression) String() string { + switch c { + case NoCompression: + return "none" + case GzipCompression: + return "gzip" + default: + panic("Impossible!") + } +} + +const ManifestExtension = "manifest" + +type Manifest struct { + KV ManifestFileEntry `json:"kv"` + SQL ManifestFileEntry `json:"sql"` + Buckets []ManifestBucketEntry `json:"buckets"` +} + +type ManifestFileEntry struct { + FileName string `json:"fileName"` + Size int64 `json:"size"` + Compression FileCompression `json:"compression"` +} + +type ManifestBucketEntry struct { + OrganizationID string `json:"organizationID"` + OrganizationName string `json:"organizationName"` + BucketID string `json:"bucketID"` + BucketName string `json:"bucketName"` + Description *string `json:"description,omitempty"` + DefaultRetentionPolicy string `json:"defaultRetentionPolicy"` + RetentionPolicies []ManifestRetentionPolicy `json:"retentionPolicies"` +} + +type ManifestRetentionPolicy struct { + Name string `json:"name"` + ReplicaN int32 `json:"replicaN"` + Duration int64 `json:"duration"` + ShardGroupDuration int64 `json:"shardGroupDuration"` + ShardGroups []ManifestShardGroup `json:"shardGroups"` + Subscriptions []ManifestSubscription `json:"subscriptions"` +} + +type ManifestShardGroup struct { + ID int64 `json:"id"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + DeletedAt *time.Time `json:"deletedAt,omitempty"` + TruncatedAt *time.Time `json:"truncatedAt,omitempty"` + Shards []ManifestShardEntry `json:"shards"` +} + +type ManifestShardEntry struct { + ID int64 `json:"id"` + ShardOwners []ShardOwner `json:"shardOwners"` + ManifestFileEntry +} + +type ShardOwner struct { + NodeID int64 `json:"nodeID"` +} + +type ManifestSubscription struct { + Name string `json:"name"` + Mode string `json:"mode"` + Destinations []string `json:"destinations"` +}