feat: implement influx restore using new APIs (#121)

This commit is contained in:
Daniel Moran
2021-06-15 16:45:55 -04:00
committed by GitHub
parent abe521add0
commit 6757c2bcfa
10 changed files with 650 additions and 166 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients" "github.com/influxdata/influx-cli/v2/clients"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
) )
type Client struct { type Client struct {
@ -24,7 +25,7 @@ type Client struct {
// Local state tracked across steps in the backup process. // Local state tracked across steps in the backup process.
baseName string baseName string
bucketMetadata []api.BucketMetadataManifest bucketMetadata []api.BucketMetadataManifest
manifest Manifest manifest br.Manifest
} }
type Params struct { type Params struct {
@ -42,7 +43,7 @@ type Params struct {
Path string Path string
// Compression to use for local copies of snapshot files. // Compression to use for local copies of snapshot files.
Compression FileCompression Compression br.FileCompression
} }
func (p *Params) matches(bkt api.BucketMetadataManifest) bool { func (p *Params) matches(bkt api.BucketMetadataManifest) bool {
@ -104,9 +105,9 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error {
} }
defer body.Close() defer body.Close()
writeFile := func(from io.Reader, to string) (ManifestFileEntry, error) { writeFile := func(from io.Reader, to string) (br.ManifestFileEntry, error) {
toPath := filepath.Join(params.Path, to) toPath := filepath.Join(params.Path, to)
if params.Compression == GzipCompression { if params.Compression == br.GzipCompression {
toPath = toPath + ".gz" toPath = toPath + ".gz"
} }
@ -120,7 +121,7 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error {
defer out.Close() defer out.Close()
var outW io.Writer = out var outW io.Writer = out
if params.Compression == GzipCompression { if params.Compression == br.GzipCompression {
gw := gzip.NewWriter(out) gw := gzip.NewWriter(out)
defer gw.Close() defer gw.Close()
outW = gw outW = gw
@ -129,14 +130,14 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error {
_, err = io.Copy(outW, from) _, err = io.Copy(outW, from)
return err return err
}(); err != nil { }(); err != nil {
return ManifestFileEntry{}, err return br.ManifestFileEntry{}, err
} }
fi, err := os.Stat(toPath) fi, err := os.Stat(toPath)
if err != nil { if err != nil {
return ManifestFileEntry{}, err return br.ManifestFileEntry{}, err
} }
return ManifestFileEntry{ return br.ManifestFileEntry{
FileName: fi.Name(), FileName: fi.Name(),
Size: fi.Size(), Size: fi.Size(),
Compression: params.Compression, Compression: params.Compression,
@ -185,12 +186,12 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error {
// //
// Bucket metadata must be pre-seeded via downloadMetadata before this method is called. // Bucket metadata must be pre-seeded via downloadMetadata before this method is called.
func (c *Client) downloadBucketData(ctx context.Context, params *Params) error { func (c *Client) downloadBucketData(ctx context.Context, params *Params) error {
c.manifest.Buckets = make([]ManifestBucketEntry, 0, len(c.bucketMetadata)) c.manifest.Buckets = make([]br.ManifestBucketEntry, 0, len(c.bucketMetadata))
for _, b := range c.bucketMetadata { for _, b := range c.bucketMetadata {
if !params.matches(b) { if !params.matches(b) {
continue continue
} }
bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*ManifestFileEntry, error) { bktManifest, err := ConvertBucketManifest(b, func(shardId int64) (*br.ManifestFileEntry, error) {
return c.downloadShardData(ctx, params, shardId) return c.downloadShardData(ctx, params, shardId)
}) })
if err != nil { if err != nil {
@ -203,7 +204,7 @@ func (c *Client) downloadBucketData(ctx context.Context, params *Params) error {
// downloadShardData downloads the TSM snapshot for a single shard. The snapshot is written // downloadShardData downloads the TSM snapshot for a single shard. The snapshot is written
// to a local file, and its metadata is returned for aggregation. // to a local file, and its metadata is returned for aggregation.
func (c Client) downloadShardData(ctx context.Context, params *Params, shardId int64) (*ManifestFileEntry, error) { func (c Client) downloadShardData(ctx context.Context, params *Params, shardId int64) (*br.ManifestFileEntry, error) {
log.Printf("INFO: Backing up TSM for shard %d", shardId) log.Printf("INFO: Backing up TSM for shard %d", shardId)
res, err := c.GetBackupShardId(ctx, shardId).AcceptEncoding("gzip").Execute() res, err := c.GetBackupShardId(ctx, shardId).AcceptEncoding("gzip").Execute()
if err != nil { if err != nil {
@ -218,7 +219,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i
defer res.Body.Close() defer res.Body.Close()
fileName := fmt.Sprintf("%s.%d.tar", c.baseName, shardId) fileName := fmt.Sprintf("%s.%d.tar", c.baseName, shardId)
if params.Compression == GzipCompression { if params.Compression == br.GzipCompression {
fileName = fileName + ".gz" fileName = fileName + ".gz"
} }
path := filepath.Join(params.Path, fileName) path := filepath.Join(params.Path, fileName)
@ -236,12 +237,12 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i
var outW io.Writer = f var outW io.Writer = f
// Make sure the locally-written data is compressed according to the user's request. // Make sure the locally-written data is compressed according to the user's request.
if params.Compression == GzipCompression && res.Header.Get("Content-Encoding") != "gzip" { if params.Compression == br.GzipCompression && res.Header.Get("Content-Encoding") != "gzip" {
gzw := gzip.NewWriter(outW) gzw := gzip.NewWriter(outW)
defer gzw.Close() defer gzw.Close()
outW = gzw outW = gzw
} }
if params.Compression == NoCompression && res.Header.Get("Content-Encoding") == "gzip" { if params.Compression == br.NoCompression && res.Header.Get("Content-Encoding") == "gzip" {
gzr, err := gzip.NewReader(inR) gzr, err := gzip.NewReader(inR)
if err != nil { if err != nil {
return err return err
@ -260,7 +261,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ManifestFileEntry{ return &br.ManifestFileEntry{
FileName: fi.Name(), FileName: fi.Name(),
Size: fi.Size(), Size: fi.Size(),
Compression: params.Compression, Compression: params.Compression,
@ -270,7 +271,7 @@ func (c Client) downloadShardData(ctx context.Context, params *Params, shardId i
// writeManifest writes a description of all files downloaded as part of the backup process // writeManifest writes a description of all files downloaded as part of the backup process
// to the backup folder, encoded as JSON. // to the backup folder, encoded as JSON.
func (c Client) writeManifest(params *Params) error { func (c Client) writeManifest(params *Params) error {
manifestPath := filepath.Join(params.Path, fmt.Sprintf("%s.manifest", c.baseName)) manifestPath := filepath.Join(params.Path, fmt.Sprintf("%s.%s", c.baseName, br.ManifestExtension))
f, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) f, err := os.OpenFile(manifestPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil { if err != nil {
return err return err

View File

@ -18,6 +18,7 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients" "github.com/influxdata/influx-cli/v2/clients"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
"github.com/influxdata/influx-cli/v2/internal/mock" "github.com/influxdata/influx-cli/v2/internal/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -55,28 +56,28 @@ func TestBackup_DownloadMetadata(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
compression FileCompression compression br.FileCompression
responseCompression FileCompression responseCompression br.FileCompression
}{ }{
{ {
name: "no gzip", name: "no gzip",
compression: NoCompression, compression: br.NoCompression,
responseCompression: NoCompression, responseCompression: br.NoCompression,
}, },
{ {
name: "response gzip, no local gzip", name: "response gzip, no local gzip",
compression: NoCompression, compression: br.NoCompression,
responseCompression: GzipCompression, responseCompression: br.GzipCompression,
}, },
{ {
name: "no response gzip, local gzip", name: "no response gzip, local gzip",
compression: GzipCompression, compression: br.GzipCompression,
responseCompression: NoCompression, responseCompression: br.NoCompression,
}, },
{ {
name: "all gzip", name: "all gzip",
compression: GzipCompression, compression: br.GzipCompression,
responseCompression: GzipCompression, responseCompression: br.GzipCompression,
}, },
} }
@ -94,7 +95,7 @@ func TestBackup_DownloadMetadata(t *testing.T) {
DoAndReturn(func(request api.ApiGetBackupMetadataRequest) (*http.Response, error) { DoAndReturn(func(request api.ApiGetBackupMetadataRequest) (*http.Response, error) {
out := bytes.Buffer{} out := bytes.Buffer{}
var outW io.Writer = &out var outW io.Writer = &out
if tc.responseCompression == GzipCompression { if tc.responseCompression == br.GzipCompression {
gzw := gzip.NewWriter(outW) gzw := gzip.NewWriter(outW)
defer gzw.Close() defer gzw.Close()
outW = gzw outW = gzw
@ -144,7 +145,7 @@ func TestBackup_DownloadMetadata(t *testing.T) {
res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)} res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)}
res.Header.Add("Content-Type", fmt.Sprintf("multipart/mixed; boundary=%s", writer.Boundary())) res.Header.Add("Content-Type", fmt.Sprintf("multipart/mixed; boundary=%s", writer.Boundary()))
if tc.responseCompression == GzipCompression { if tc.responseCompression == br.GzipCompression {
res.Header.Add("Content-Encoding", "gzip") res.Header.Add("Content-Encoding", "gzip")
} }
return &res, nil return &res, nil
@ -173,7 +174,7 @@ func TestBackup_DownloadMetadata(t *testing.T) {
defer localKv.Close() defer localKv.Close()
var kvReader io.Reader = localKv var kvReader io.Reader = localKv
if tc.compression == GzipCompression { if tc.compression == br.GzipCompression {
gzr, err := gzip.NewReader(kvReader) gzr, err := gzip.NewReader(kvReader)
require.NoError(t, err) require.NoError(t, err)
defer gzr.Close() defer gzr.Close()
@ -188,7 +189,7 @@ func TestBackup_DownloadMetadata(t *testing.T) {
defer localSql.Close() defer localSql.Close()
var sqlReader io.Reader = localSql var sqlReader io.Reader = localSql
if tc.compression == GzipCompression { if tc.compression == br.GzipCompression {
gzr, err := gzip.NewReader(sqlReader) gzr, err := gzip.NewReader(sqlReader)
require.NoError(t, err) require.NoError(t, err)
defer gzr.Close() defer gzr.Close()
@ -208,28 +209,28 @@ func TestBackup_DownloadShardData(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
compression FileCompression compression br.FileCompression
responseCompression FileCompression responseCompression br.FileCompression
}{ }{
{ {
name: "no gzip", name: "no gzip",
compression: NoCompression, compression: br.NoCompression,
responseCompression: NoCompression, responseCompression: br.NoCompression,
}, },
{ {
name: "response gzip, no local gzip", name: "response gzip, no local gzip",
compression: NoCompression, compression: br.NoCompression,
responseCompression: GzipCompression, responseCompression: br.GzipCompression,
}, },
{ {
name: "no response gzip, local gzip", name: "no response gzip, local gzip",
compression: GzipCompression, compression: br.GzipCompression,
responseCompression: NoCompression, responseCompression: br.NoCompression,
}, },
{ {
name: "all gzip", name: "all gzip",
compression: GzipCompression, compression: br.GzipCompression,
responseCompression: GzipCompression, responseCompression: br.GzipCompression,
}, },
} }
@ -247,7 +248,7 @@ func TestBackup_DownloadShardData(t *testing.T) {
DoAndReturn(func(api.ApiGetBackupShardIdRequest) (*http.Response, error) { DoAndReturn(func(api.ApiGetBackupShardIdRequest) (*http.Response, error) {
out := bytes.Buffer{} out := bytes.Buffer{}
var outW io.Writer = &out var outW io.Writer = &out
if tc.responseCompression == GzipCompression { if tc.responseCompression == br.GzipCompression {
gzw := gzip.NewWriter(outW) gzw := gzip.NewWriter(outW)
defer gzw.Close() defer gzw.Close()
outW = gzw outW = gzw
@ -256,7 +257,7 @@ func TestBackup_DownloadShardData(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)} res := http.Response{Header: http.Header{}, Body: ioutil.NopCloser(&out)}
res.Header.Add("Content-Type", "application/octet-stream") res.Header.Add("Content-Type", "application/octet-stream")
if tc.responseCompression == GzipCompression { if tc.responseCompression == br.GzipCompression {
res.Header.Add("Content-Encoding", "gzip") res.Header.Add("Content-Encoding", "gzip")
} }
return &res, nil return &res, nil
@ -285,7 +286,7 @@ func TestBackup_DownloadShardData(t *testing.T) {
defer localShard.Close() defer localShard.Close()
var shardReader io.Reader = localShard var shardReader io.Reader = localShard
if tc.compression == GzipCompression { if tc.compression == br.GzipCompression {
gzr, err := gzip.NewReader(shardReader) gzr, err := gzip.NewReader(shardReader)
require.NoError(t, err) require.NoError(t, err)
defer gzr.Close() defer gzr.Close()

View File

@ -2,94 +2,53 @@ package backup
import ( import (
"fmt" "fmt"
"time"
"github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/api"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
) )
type FileCompression int func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestBucketEntry, error) {
m := br.ManifestBucketEntry{
const (
NoCompression FileCompression = iota
GzipCompression
)
func (c *FileCompression) Set(v string) error {
switch v {
case "none":
*c = NoCompression
case "gzip":
*c = GzipCompression
default:
return fmt.Errorf("unsupported format: %q", v)
}
return nil
}
func (c FileCompression) String() string {
switch c {
case NoCompression:
return "none"
case GzipCompression:
return "gzip"
default:
panic("Impossible!")
}
}
type Manifest struct {
KV ManifestFileEntry `json:"kv"`
SQL ManifestFileEntry `json:"sql"`
Buckets []ManifestBucketEntry `json:"buckets"`
}
type ManifestFileEntry struct {
FileName string `json:"fileName"`
Size int64 `json:"size"`
Compression FileCompression `json:"compression"`
}
func ConvertBucketManifest(manifest api.BucketMetadataManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestBucketEntry, error) {
m := ManifestBucketEntry{
OrganizationID: manifest.OrganizationID, OrganizationID: manifest.OrganizationID,
OrganizationName: manifest.OrganizationName, OrganizationName: manifest.OrganizationName,
BucketID: manifest.BucketID, BucketID: manifest.BucketID,
BucketName: manifest.BucketName, BucketName: manifest.BucketName,
Description: manifest.Description,
DefaultRetentionPolicy: manifest.DefaultRetentionPolicy, DefaultRetentionPolicy: manifest.DefaultRetentionPolicy,
RetentionPolicies: make([]ManifestRetentionPolicy, len(manifest.RetentionPolicies)), RetentionPolicies: make([]br.ManifestRetentionPolicy, len(manifest.RetentionPolicies)),
} }
for i, rp := range manifest.RetentionPolicies { for i, rp := range manifest.RetentionPolicies {
var err error var err error
m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard) m.RetentionPolicies[i], err = ConvertRetentionPolicy(rp, getShard)
if err != nil { if err != nil {
return ManifestBucketEntry{}, err return br.ManifestBucketEntry{}, err
} }
} }
return m, nil return m, nil
} }
func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestRetentionPolicy, error) { func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestRetentionPolicy, error) {
m := ManifestRetentionPolicy{ m := br.ManifestRetentionPolicy{
Name: manifest.Name, Name: manifest.Name,
ReplicaN: manifest.ReplicaN, ReplicaN: manifest.ReplicaN,
Duration: manifest.Duration, Duration: manifest.Duration,
ShardGroupDuration: manifest.ShardGroupDuration, ShardGroupDuration: manifest.ShardGroupDuration,
ShardGroups: make([]ManifestShardGroup, len(manifest.ShardGroups)), ShardGroups: make([]br.ManifestShardGroup, len(manifest.ShardGroups)),
Subscriptions: make([]ManifestSubscription, len(manifest.Subscriptions)), Subscriptions: make([]br.ManifestSubscription, len(manifest.Subscriptions)),
} }
for i, sg := range manifest.ShardGroups { for i, sg := range manifest.ShardGroups {
var err error var err error
m.ShardGroups[i], err = ConvertShardGroup(sg, getShard) m.ShardGroups[i], err = ConvertShardGroup(sg, getShard)
if err != nil { if err != nil {
return ManifestRetentionPolicy{}, err return br.ManifestRetentionPolicy{}, err
} }
} }
for i, s := range manifest.Subscriptions { for i, s := range manifest.Subscriptions {
m.Subscriptions[i] = ManifestSubscription{ m.Subscriptions[i] = br.ManifestSubscription{
Name: s.Name, Name: s.Name,
Mode: s.Mode, Mode: s.Mode,
Destinations: s.Destinations, Destinations: s.Destinations,
@ -99,20 +58,20 @@ func ConvertRetentionPolicy(manifest api.RetentionPolicyManifest, getShard func(
return m, nil return m, nil
} }
func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (ManifestShardGroup, error) { func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (br.ManifestShardGroup, error) {
m := ManifestShardGroup{ m := br.ManifestShardGroup{
ID: manifest.Id, ID: manifest.Id,
StartTime: manifest.StartTime, StartTime: manifest.StartTime,
EndTime: manifest.EndTime, EndTime: manifest.EndTime,
DeletedAt: manifest.DeletedAt, DeletedAt: manifest.DeletedAt,
TruncatedAt: manifest.TruncatedAt, TruncatedAt: manifest.TruncatedAt,
Shards: make([]ManifestShardEntry, 0, len(manifest.Shards)), Shards: make([]br.ManifestShardEntry, 0, len(manifest.Shards)),
} }
for _, sh := range manifest.Shards { for _, sh := range manifest.Shards {
maybeShard, err := ConvertShard(sh, getShard) maybeShard, err := ConvertShard(sh, getShard)
if err != nil { if err != nil {
return ManifestShardGroup{}, err return br.ManifestShardGroup{}, err
} }
// Shard deleted mid-backup. // Shard deleted mid-backup.
if maybeShard == nil { if maybeShard == nil {
@ -124,7 +83,7 @@ func ConvertShardGroup(manifest api.ShardGroupManifest, getShard func(shardId in
return m, nil return m, nil
} }
func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*ManifestFileEntry, error)) (*ManifestShardEntry, error) { func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*br.ManifestFileEntry, error)) (*br.ManifestShardEntry, error) {
shardFileInfo, err := getShard(manifest.Id) shardFileInfo, err := getShard(manifest.Id)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err) return nil, fmt.Errorf("failed to download snapshot of shard %d: %w", manifest.Id, err)
@ -133,60 +92,17 @@ func ConvertShard(manifest api.ShardManifest, getShard func(shardId int64) (*Man
return nil, nil return nil, nil
} }
m := ManifestShardEntry{ m := br.ManifestShardEntry{
ID: manifest.Id, ID: manifest.Id,
ShardOwners: make([]ShardOwner, len(manifest.ShardOwners)), ShardOwners: make([]br.ShardOwner, len(manifest.ShardOwners)),
ManifestFileEntry: *shardFileInfo, ManifestFileEntry: *shardFileInfo,
} }
for i, o := range manifest.ShardOwners { for i, o := range manifest.ShardOwners {
m.ShardOwners[i] = ShardOwner{ m.ShardOwners[i] = br.ShardOwner{
NodeID: o.NodeID, NodeID: o.NodeID,
} }
} }
return &m, nil return &m, nil
} }
type ManifestBucketEntry struct {
OrganizationID string `json:"organizationID"`
OrganizationName string `json:"organizationName"`
BucketID string `json:"bucketID"`
BucketName string `json:"bucketName"`
DefaultRetentionPolicy string `json:"defaultRetentionPolicy"`
RetentionPolicies []ManifestRetentionPolicy `json:"retentionPolicies"`
}
type ManifestRetentionPolicy struct {
Name string `json:"name"`
ReplicaN int32 `json:"replicaN"`
Duration int64 `json:"duration"`
ShardGroupDuration int64 `json:"shardGroupDuration"`
ShardGroups []ManifestShardGroup `json:"shardGroups"`
Subscriptions []ManifestSubscription `json:"subscriptions"`
}
type ManifestShardGroup struct {
ID int64 `json:"id"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
TruncatedAt *time.Time `json:"truncatedAt,omitempty"`
Shards []ManifestShardEntry `json:"shards"`
}
type ManifestShardEntry struct {
ID int64 `json:"id"`
ShardOwners []ShardOwner `json:"shardOwners"`
ManifestFileEntry
}
type ShardOwner struct {
NodeID int64 `json:"nodeID"`
}
type ManifestSubscription struct {
Name string `json:"name"`
Mode string `json:"mode"`
Destinations []string `json:"destinations"`
}

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/influx-cli/v2/api" "github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients/backup" "github.com/influxdata/influx-cli/v2/clients/backup"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -87,46 +88,46 @@ func TestConvertBucketManifest(t *testing.T) {
}, },
} }
fakeGetShard := func(id int64) (*backup.ManifestFileEntry, error) { fakeGetShard := func(id int64) (*br.ManifestFileEntry, error) {
if id == 20 { if id == 20 {
return nil, nil return nil, nil
} }
return &backup.ManifestFileEntry{ return &br.ManifestFileEntry{
FileName: fmt.Sprintf("%d.gz", id), FileName: fmt.Sprintf("%d.gz", id),
Size: id * 100, Size: id * 100,
Compression: backup.GzipCompression, Compression: br.GzipCompression,
}, nil }, nil
} }
converted, err := backup.ConvertBucketManifest(manifest, fakeGetShard) converted, err := backup.ConvertBucketManifest(manifest, fakeGetShard)
require.NoError(t, err) require.NoError(t, err)
expected := backup.ManifestBucketEntry{ expected := br.ManifestBucketEntry{
OrganizationID: "123", OrganizationID: "123",
OrganizationName: "org", OrganizationName: "org",
BucketID: "456", BucketID: "456",
BucketName: "bucket", BucketName: "bucket",
DefaultRetentionPolicy: "foo", DefaultRetentionPolicy: "foo",
RetentionPolicies: []backup.ManifestRetentionPolicy{ RetentionPolicies: []br.ManifestRetentionPolicy{
{ {
Name: "foo", Name: "foo",
ReplicaN: 1, ReplicaN: 1,
Duration: 100, Duration: 100,
ShardGroupDuration: 10, ShardGroupDuration: 10,
ShardGroups: []backup.ManifestShardGroup{ ShardGroups: []br.ManifestShardGroup{
{ {
ID: 1, ID: 1,
StartTime: now, StartTime: now,
EndTime: now, EndTime: now,
TruncatedAt: &now, TruncatedAt: &now,
Shards: []backup.ManifestShardEntry{ Shards: []br.ManifestShardEntry{
{ {
ID: 10, ID: 10,
ShardOwners: []backup.ShardOwner{{NodeID: 1}}, ShardOwners: []br.ShardOwner{{NodeID: 1}},
ManifestFileEntry: backup.ManifestFileEntry{ ManifestFileEntry: br.ManifestFileEntry{
FileName: "10.gz", FileName: "10.gz",
Size: 1000, Size: 1000,
Compression: backup.GzipCompression, Compression: br.GzipCompression,
}, },
}, },
}, },
@ -136,35 +137,35 @@ func TestConvertBucketManifest(t *testing.T) {
StartTime: now, StartTime: now,
EndTime: now, EndTime: now,
DeletedAt: &now, DeletedAt: &now,
Shards: []backup.ManifestShardEntry{ Shards: []br.ManifestShardEntry{
{ {
ID: 30, ID: 30,
ShardOwners: []backup.ShardOwner{}, ShardOwners: []br.ShardOwner{},
ManifestFileEntry: backup.ManifestFileEntry{ ManifestFileEntry: br.ManifestFileEntry{
FileName: "30.gz", FileName: "30.gz",
Size: 3000, Size: 3000,
Compression: backup.GzipCompression, Compression: br.GzipCompression,
}, },
}, },
}, },
}, },
}, },
Subscriptions: []backup.ManifestSubscription{}, Subscriptions: []br.ManifestSubscription{},
}, },
{ {
Name: "bar", Name: "bar",
ReplicaN: 3, ReplicaN: 3,
Duration: 9999, Duration: 9999,
ShardGroupDuration: 1, ShardGroupDuration: 1,
ShardGroups: []backup.ManifestShardGroup{ ShardGroups: []br.ManifestShardGroup{
{ {
ID: 3, ID: 3,
StartTime: now, StartTime: now,
EndTime: now, EndTime: now,
Shards: []backup.ManifestShardEntry{}, Shards: []br.ManifestShardEntry{},
}, },
}, },
Subscriptions: []backup.ManifestSubscription{ Subscriptions: []br.ManifestSubscription{
{ {
Name: "test", Name: "test",
Mode: "on", Mode: "on",

View File

@ -0,0 +1,79 @@
package restore
import (
"github.com/influxdata/influx-cli/v2/api"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
)
func ConvertBucketManifest(manifest br.ManifestBucketEntry) api.BucketMetadataManifest {
m := api.BucketMetadataManifest{
OrganizationID: manifest.OrganizationID,
OrganizationName: manifest.OrganizationName,
BucketID: manifest.BucketID,
BucketName: manifest.BucketName,
Description: manifest.Description,
DefaultRetentionPolicy: manifest.DefaultRetentionPolicy,
RetentionPolicies: make([]api.RetentionPolicyManifest, len(manifest.RetentionPolicies)),
}
for i, rp := range manifest.RetentionPolicies {
m.RetentionPolicies[i] = ConvertRetentionPolicy(rp)
}
return m
}
func ConvertRetentionPolicy(manifest br.ManifestRetentionPolicy) api.RetentionPolicyManifest {
m := api.RetentionPolicyManifest{
Name: manifest.Name,
ReplicaN: manifest.ReplicaN,
Duration: manifest.Duration,
ShardGroupDuration: manifest.ShardGroupDuration,
ShardGroups: make([]api.ShardGroupManifest, len(manifest.ShardGroups)),
Subscriptions: make([]api.SubscriptionManifest, len(manifest.Subscriptions)),
}
for i, sg := range manifest.ShardGroups {
m.ShardGroups[i] = ConvertShardGroup(sg)
}
for i, s := range manifest.Subscriptions {
m.Subscriptions[i] = api.SubscriptionManifest{
Name: s.Name,
Mode: s.Mode,
Destinations: s.Destinations,
}
}
return m
}
func ConvertShardGroup(manifest br.ManifestShardGroup) api.ShardGroupManifest {
m := api.ShardGroupManifest{
Id: manifest.ID,
StartTime: manifest.StartTime,
EndTime: manifest.EndTime,
DeletedAt: manifest.DeletedAt,
TruncatedAt: manifest.TruncatedAt,
Shards: make([]api.ShardManifest, len(manifest.Shards)),
}
for i, sh := range manifest.Shards {
m.Shards[i] = ConvertShard(sh)
}
return m
}
func ConvertShard(manifest br.ManifestShardEntry) api.ShardManifest {
m := api.ShardManifest{
Id: manifest.ID,
ShardOwners: make([]api.ShardOwner, len(manifest.ShardOwners)),
}
for i, so := range manifest.ShardOwners {
m.ShardOwners[i] = api.ShardOwner{NodeID: so.NodeID}
}
return m
}

292
clients/restore/restore.go Normal file
View File

@ -0,0 +1,292 @@
package restore
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
"github.com/influxdata/influx-cli/v2/pkg/gzip"
)
type Client struct {
clients.CLI
api.RestoreApi
api.OrganizationsApi
manifest br.Manifest
}
type Params struct {
// Path to local backup data created using `influx backup`
Path string
// Original ID/name of the organization to restore.
// If not set, all orgs will be restored.
OrgID string
Org string
// New name to use for the restored organization.
// If not set, the org will be restored using its backed-up name.
NewOrgName string
// Original ID/name of the bucket to restore.
// If not set, all buckets within the org filter will be restored.
BucketID string
Bucket string
// New name to use for the restored bucket.
// If not set, the bucket will be restored using its backed-up name.
NewBucketName string
// If true, replace all data on the server with the local backup.
// Otherwise only restore the requested org/bucket, leaving other data untouched.
Full bool
}
func (p *Params) matches(bkt br.ManifestBucketEntry) bool {
if p.OrgID != "" && bkt.OrganizationID != p.OrgID {
return false
}
if p.Org != "" && bkt.OrganizationName != p.Org {
return false
}
if p.BucketID != "" && bkt.BucketID != p.BucketID {
return false
}
if p.Bucket != "" && bkt.BucketName != p.Bucket {
return false
}
return true
}
func (c *Client) Restore(ctx context.Context, params *Params) error {
if err := c.loadManifests(params.Path); err != nil {
return err
}
if params.Full {
return c.fullRestore(ctx, params.Path)
}
return c.partialRestore(ctx, params)
}
// loadManifests finds and merges all backup manifests stored in a given directory,
// keeping the latest top-level metadata and latest metadata per-bucket.
func (c *Client) loadManifests(path string) error {
// Read all manifest files from path, sort in ascending time.
manifests, err := filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", br.ManifestExtension)))
if err != nil {
return fmt.Errorf("failed to find backup manifests at %q: %w", path, err)
} else if len(manifests) == 0 {
return fmt.Errorf("no backup manifests found at %q", path)
}
sort.Strings(manifests)
bucketManifests := map[string]br.ManifestBucketEntry{}
for _, manifestFile := range manifests {
// Skip file if it is a directory.
if fi, err := os.Stat(manifestFile); err != nil {
return fmt.Errorf("failed to inspect local manifest at %q: %w", manifestFile, err)
} else if fi.IsDir() {
continue
}
var manifest br.Manifest
if buf, err := os.ReadFile(manifestFile); err != nil {
return fmt.Errorf("failed to read local manifest at %q: %w", manifestFile, err)
} else if err := json.Unmarshal(buf, &manifest); err != nil {
return fmt.Errorf("failed to parse manifest at %q: %w", manifestFile, err)
}
// Keep the latest KV and SQL overall.
c.manifest.KV = manifest.KV
c.manifest.SQL = manifest.SQL
// Keep the latest manifest per-bucket.
for _, bkt := range manifest.Buckets {
bucketManifests[bkt.BucketID] = bkt
}
}
c.manifest.Buckets = make([]br.ManifestBucketEntry, 0, len(bucketManifests))
for _, bkt := range bucketManifests {
c.manifest.Buckets = append(c.manifest.Buckets, bkt)
}
return nil
}
func (c Client) fullRestore(ctx context.Context, path string) error {
// Make sure we can read both local metadata snapshots before
kvBytes, err := c.readFileGzipped(path, c.manifest.KV)
if err != nil {
return fmt.Errorf("failed to open local KV backup at %q: %w", filepath.Join(path, c.manifest.KV.FileName), err)
}
defer kvBytes.Close()
sqlBytes, err := c.readFileGzipped(path, c.manifest.SQL)
if err != nil {
return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err)
}
defer sqlBytes.Close()
// Upload metadata snapshots to the server.
log.Println("INFO: Restoring KV snapshot")
if err := c.PostRestoreKV(ctx).
ContentEncoding("gzip").
ContentType("application/octet-stream").
Body(kvBytes).
Execute(); err != nil {
return fmt.Errorf("failed to restore KV snapshot: %w", err)
}
log.Println("INFO: Restoring SQL snapshot")
if err := c.PostRestoreSQL(ctx).
ContentEncoding("gzip").
ContentType("application/octet-stream").
Body(sqlBytes).
Execute(); err != nil {
return fmt.Errorf("failed to restore SQL snapshot: %w", err)
}
// Drill down through bucket manifests to reach shard info, and upload it.
for _, b := range c.manifest.Buckets {
for _, rp := range b.RetentionPolicies {
for _, sg := range rp.ShardGroups {
for _, s := range sg.Shards {
if err := c.restoreShard(ctx, path, s); err != nil {
return err
}
}
}
}
}
return nil
}
func (c Client) partialRestore(ctx context.Context, params *Params) (err error) {
orgIds := map[string]string{}
for _, bkt := range c.manifest.Buckets {
// Skip internal buckets.
if strings.HasPrefix(bkt.BucketName, "_") {
continue
}
if !params.matches(bkt) {
continue
}
orgName := bkt.OrganizationName
// Before this method is called, we ensure that new-org-name is only set if
// a filter on org-name or org-id is set. If that check passes and execution
// reaches this code, we can assume that all buckets matching the filter come
// from the same org, so we can swap in the new org name unconditionally.
if params.NewOrgName != "" {
orgName = params.NewOrgName
}
if _, ok := orgIds[orgName]; !ok {
orgIds[orgName], err = c.restoreOrg(ctx, orgName)
if err != nil {
return
}
}
// By the same reasoning as above, if new-bucket-name is non-empty we know
// filters must have been set to ensure we only match 1 bucket, so we can
// swap the name without additional checks.
if params.NewBucketName != "" {
bkt.BucketName = params.NewBucketName
}
log.Printf("INFO: Restoring bucket %q as %q\n", bkt.BucketID, bkt.BucketName)
bucketMapping, err := c.PostRestoreBucketMetadata(ctx).
BucketMetadataManifest(ConvertBucketManifest(bkt)).
Execute()
if err != nil {
return fmt.Errorf("failed to restore bucket %q: %w", bkt.BucketName, err)
}
shardIdMap := make(map[int64]int64, len(bucketMapping.ShardMappings))
for _, mapping := range bucketMapping.ShardMappings {
shardIdMap[mapping.OldId] = mapping.NewId
}
for _, rp := range bkt.RetentionPolicies {
for _, sg := range rp.ShardGroups {
for _, sh := range sg.Shards {
newID, ok := shardIdMap[sh.ID]
if !ok {
log.Printf("WARN: Server didn't map ID for shard %d in bucket %q, skipping\n", sh.ID, bkt.BucketName)
continue
}
sh.ID = newID
if err := c.restoreShard(ctx, params.Path, sh); err != nil {
return err
}
}
}
}
}
return nil
}
// restoreOrg gets the ID for the org with the given name, creating the org if it doesn't already exist.
func (c Client) restoreOrg(ctx context.Context, name string) (string, error) {
orgs, err := c.GetOrgs(ctx).Org(name).Execute()
if err != nil {
return "", fmt.Errorf("failed to check existence of organization %q: %w", name, err)
}
if len(orgs.GetOrgs()) == 0 {
// Create any missing orgs.
newOrg, err := c.PostOrgs(ctx).PostOrganizationRequest(api.PostOrganizationRequest{Name: name}).Execute()
if err != nil {
return "", fmt.Errorf("failed to create organization %q: %w", name, err)
}
return *newOrg.Id, nil
}
return *orgs.GetOrgs()[0].Id, nil
}
// readFileGzipped opens a local file and returns a reader of its contents,
// compressed with gzip.
func (c Client) readFileGzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) {
fullPath := filepath.Join(path, file.FileName)
f, err := os.Open(fullPath)
if err != nil {
return nil, err
}
if file.Compression == br.GzipCompression {
return f, nil
}
return gzip.NewGzipPipe(f), nil
}
func (c Client) restoreShard(ctx context.Context, path string, m br.ManifestShardEntry) error {
// Make sure we can read the local snapshot.
tsmBytes, err := c.readFileGzipped(path, m.ManifestFileEntry)
if err != nil {
return fmt.Errorf("failed to open local TSM snapshot at %q: %w", filepath.Join(path, m.FileName), err)
}
defer tsmBytes.Close()
log.Printf("INFO: Restoring TSM snapshot for shard %d\n", m.ID)
if err := c.PostRestoreShardId(ctx, fmt.Sprintf("%d", m.ID)).
ContentEncoding("gzip").
ContentType("application/octet-stream").
Body(tsmBytes).
Execute(); err != nil {
return fmt.Errorf("failed to restore TSM snapshot for shard %d: %w", m.ID, err)
}
return nil
}

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"github.com/influxdata/influx-cli/v2/clients/backup" "github.com/influxdata/influx-cli/v2/clients/backup"
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware" "github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
@ -11,7 +12,7 @@ import (
func newBackupCmd() *cli.Command { func newBackupCmd() *cli.Command {
var params backup.Params var params backup.Params
// Default to gzipping local files. // Default to gzipping local files.
params.Compression = backup.GzipCompression params.Compression = br.GzipCompression
return &cli.Command{ return &cli.Command{
Name: "backup", Name: "backup",

View File

@ -45,6 +45,7 @@ var app = cli.App{
newUserCmd(), newUserCmd(),
newTaskCommand(), newTaskCommand(),
newBackupCmd(), newBackupCmd(),
newRestoreCmd(),
newTelegrafsCommand(), newTelegrafsCommand(),
}, },
} }

98
cmd/influx/restore.go Normal file
View File

@ -0,0 +1,98 @@
package main
import (
"errors"
"github.com/influxdata/influx-cli/v2/clients/restore"
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
"github.com/urfave/cli/v2"
)
func newRestoreCmd() *cli.Command {
var params restore.Params
return &cli.Command{
Name: "restore",
Usage: "Restores a backup directory to InfluxDB",
Description: `Restore influxdb.
Examples:
# backup all data
influx restore /path/to/restore
`,
ArgsUsage: "path",
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
Flags: append(
commonFlagsNoPrint(),
&cli.BoolFlag{
Name: "full",
Usage: "Fully restore and replace all data on server",
Destination: &params.Full,
},
&cli.StringFlag{
Name: "org-id",
Usage: "The original ID of the organization to restore",
EnvVars: []string{"INFLUX_ORG_ID"},
Destination: &params.OrgID,
},
&cli.StringFlag{
Name: "org",
Usage: "The original name of the organization to restore",
Aliases: []string{"o"},
EnvVars: []string{"INFLUX_ORG"},
Destination: &params.Org,
},
&cli.StringFlag{
Name: "bucket-id",
Usage: "The original ID of the bucket to restore",
Destination: &params.BucketID,
},
&cli.StringFlag{
Name: "bucket",
Usage: "The original name of the bucket to restore",
Aliases: []string{"b"},
Destination: &params.Bucket,
},
&cli.StringFlag{
Name: "new-bucket",
Usage: "New name to use for the restored bucket",
Destination: &params.NewBucketName,
},
&cli.StringFlag{
Name: "new-org",
Usage: "New name to use for the restored organization",
Destination: &params.NewOrgName,
},
),
Action: func(ctx *cli.Context) error {
if ctx.NArg() != 1 {
return errors.New("restore path must be specified as a single positional argument")
}
params.Path = ctx.Args().Get(0)
if params.Full && (params.Org != "" ||
params.OrgID != "" ||
params.Bucket != "" ||
params.BucketID != "" ||
params.NewOrgName != "" ||
params.NewBucketName != "") {
return errors.New("--full restore cannot be limited to a single org or bucket")
}
if params.NewOrgName != "" && params.OrgID == "" && params.Org == "" {
return errors.New("--org-id or --org must be set to use --new-org")
}
if params.NewBucketName != "" && params.BucketID == "" && params.Bucket == "" {
return errors.New("--bucket-id or --bucket must be set to use --new-bucket")
}
api := getAPI(ctx)
client := restore.Client{
CLI: getCLI(ctx),
RestoreApi: api.RestoreApi,
OrganizationsApi: api.OrganizationsApi,
}
return client.Restore(ctx.Context, &params)
},
}
}

View File

@ -0,0 +1,94 @@
package backup_restore
import (
"fmt"
"time"
)
type FileCompression int
const (
NoCompression FileCompression = iota
GzipCompression
)
func (c *FileCompression) Set(v string) error {
switch v {
case "none":
*c = NoCompression
case "gzip":
*c = GzipCompression
default:
return fmt.Errorf("unsupported format: %q", v)
}
return nil
}
func (c FileCompression) String() string {
switch c {
case NoCompression:
return "none"
case GzipCompression:
return "gzip"
default:
panic("Impossible!")
}
}
const ManifestExtension = "manifest"
type Manifest struct {
KV ManifestFileEntry `json:"kv"`
SQL ManifestFileEntry `json:"sql"`
Buckets []ManifestBucketEntry `json:"buckets"`
}
type ManifestFileEntry struct {
FileName string `json:"fileName"`
Size int64 `json:"size"`
Compression FileCompression `json:"compression"`
}
type ManifestBucketEntry struct {
OrganizationID string `json:"organizationID"`
OrganizationName string `json:"organizationName"`
BucketID string `json:"bucketID"`
BucketName string `json:"bucketName"`
Description *string `json:"description,omitempty"`
DefaultRetentionPolicy string `json:"defaultRetentionPolicy"`
RetentionPolicies []ManifestRetentionPolicy `json:"retentionPolicies"`
}
type ManifestRetentionPolicy struct {
Name string `json:"name"`
ReplicaN int32 `json:"replicaN"`
Duration int64 `json:"duration"`
ShardGroupDuration int64 `json:"shardGroupDuration"`
ShardGroups []ManifestShardGroup `json:"shardGroups"`
Subscriptions []ManifestSubscription `json:"subscriptions"`
}
type ManifestShardGroup struct {
ID int64 `json:"id"`
StartTime time.Time `json:"startTime"`
EndTime time.Time `json:"endTime"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
TruncatedAt *time.Time `json:"truncatedAt,omitempty"`
Shards []ManifestShardEntry `json:"shards"`
}
type ManifestShardEntry struct {
ID int64 `json:"id"`
ShardOwners []ShardOwner `json:"shardOwners"`
ManifestFileEntry
}
type ShardOwner struct {
NodeID int64 `json:"nodeID"`
}
type ManifestSubscription struct {
Name string `json:"name"`
Mode string `json:"mode"`
Destinations []string `json:"destinations"`
}