
* chore: gofmt * chore: update openapi gen files * feat: add replication-bucket-name flag * fix: fix tests * chore: rename to replication-bucket * feat: show remote bucket name or id * chore: fmt fixup * chore: update openapi to master * chore: fix openapi generation
510 lines
16 KiB
Go
510 lines
16 KiB
Go
package restore
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/influxdata/influx-cli/v2/api"
|
|
"github.com/influxdata/influx-cli/v2/clients"
|
|
br "github.com/influxdata/influx-cli/v2/internal/backup_restore"
|
|
"github.com/influxdata/influx-cli/v2/pkg/gzip"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type ApiConfig interface {
|
|
GetConfig() *api.Configuration
|
|
}
|
|
|
|
type Client struct {
|
|
clients.CLI
|
|
api.HealthApi
|
|
api.RestoreApi
|
|
api.BucketsApi
|
|
api.OrganizationsApi
|
|
ApiConfig
|
|
|
|
manifest br.Manifest
|
|
}
|
|
|
|
type Params struct {
|
|
// Path to local backup data created using `influx backup`
|
|
Path string
|
|
|
|
// Original ID/name of the organization to restore.
|
|
// If not set, all orgs will be restored.
|
|
clients.OrgParams
|
|
|
|
// New name to use for the restored organization.
|
|
// If not set, the org will be restored using its backed-up name.
|
|
NewOrgName string
|
|
|
|
// Original ID/name of the bucket to restore.
|
|
// If not set, all buckets within the org filter will be restored.
|
|
clients.BucketParams
|
|
|
|
// New name to use for the restored bucket.
|
|
// If not set, the bucket will be restored using its backed-up name.
|
|
NewBucketName string
|
|
|
|
// If true, replace all data on the server with the local backup.
|
|
// Otherwise only restore the requested org/bucket, leaving other data untouched.
|
|
Full bool
|
|
}
|
|
|
|
func (p *Params) matches(bkt br.ManifestBucketEntry) bool {
|
|
if p.OrgID != "" && bkt.OrganizationID != p.OrgID {
|
|
return false
|
|
}
|
|
if p.OrgName != "" && bkt.OrganizationName != p.OrgName {
|
|
return false
|
|
}
|
|
if p.BucketID != "" && bkt.BucketID != p.BucketID {
|
|
return false
|
|
}
|
|
if p.BucketName != "" && bkt.BucketName != p.BucketName {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *Client) Restore(ctx context.Context, params *Params) error {
|
|
if err := c.loadManifests(params.Path); err != nil {
|
|
return err
|
|
}
|
|
|
|
// The APIs we use to restore data depends on the server's version.
|
|
legacyServer, err := br.ServerIsLegacy(ctx, c.HealthApi)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if params.Full {
|
|
return c.fullRestore(ctx, params.Path, legacyServer)
|
|
}
|
|
return c.partialRestore(ctx, params, legacyServer)
|
|
}
|
|
|
|
// loadManifests finds and merges all backup manifests stored in a given directory,
|
|
// keeping the latest top-level metadata and latest metadata per-bucket.
|
|
func (c *Client) loadManifests(path string) error {
|
|
// Read all manifest files from path, sort in ascending time.
|
|
manifests, err := filepath.Glob(filepath.Join(path, fmt.Sprintf("*.%s", br.ManifestExtension)))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find backup manifests at %q: %w", path, err)
|
|
} else if len(manifests) == 0 {
|
|
return fmt.Errorf("no backup manifests found at %q", path)
|
|
}
|
|
sort.Strings(manifests)
|
|
|
|
bucketManifests := map[string]br.ManifestBucketEntry{}
|
|
for _, manifestFile := range manifests {
|
|
// Skip file if it is a directory.
|
|
if fi, err := os.Stat(manifestFile); err != nil {
|
|
return fmt.Errorf("failed to inspect local manifest at %q: %w", manifestFile, err)
|
|
} else if fi.IsDir() {
|
|
continue
|
|
}
|
|
|
|
manifest, err := readManifest(manifestFile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Keep the latest KV and SQL overall.
|
|
c.manifest.KV = manifest.KV
|
|
c.manifest.SQL = manifest.SQL
|
|
|
|
// Keep the latest manifest per-bucket.
|
|
for _, bkt := range manifest.Buckets {
|
|
// NOTE: Deduplicate here by keeping only the latest entry for each `<org-name>/<bucket-name>` pair.
|
|
// This prevents "bucket already exists" errors during the restore when the backup manifests contain
|
|
// entries for multiple buckets with the same name (which can happen when a bucket is deleted & re-created).
|
|
bucketManifests[fmt.Sprintf("%s/%s", bkt.OrganizationName, bkt.BucketName)] = bkt
|
|
}
|
|
}
|
|
|
|
c.manifest.Buckets = make([]br.ManifestBucketEntry, 0, len(bucketManifests))
|
|
for _, bkt := range bucketManifests {
|
|
c.manifest.Buckets = append(c.manifest.Buckets, bkt)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// fullRestore completely replaces all metadata and data on the server with the contents of a local backup.
|
|
func (c Client) fullRestore(ctx context.Context, path string, legacy bool) error {
|
|
if legacy && c.manifest.SQL != nil {
|
|
return fmt.Errorf("cannot fully restore data from %s: target server's version too old to restore SQL metadata", path)
|
|
}
|
|
|
|
// Make sure we can read both local metadata snapshots before
|
|
readKv := readFileGzipped
|
|
if legacy {
|
|
// The legacy API didn't support gzipped uploads.
|
|
readKv = readFileGunzipped
|
|
}
|
|
kvBytes, err := readKv(path, c.manifest.KV)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open local KV backup at %q: %w", filepath.Join(path, c.manifest.KV.FileName), err)
|
|
}
|
|
defer kvBytes.Close()
|
|
|
|
var sqlBytes io.ReadCloser
|
|
if c.manifest.SQL != nil {
|
|
sqlBytes, err = readFileGzipped(path, *c.manifest.SQL)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err)
|
|
}
|
|
defer sqlBytes.Close()
|
|
}
|
|
|
|
// Upload metadata snapshots to the server.
|
|
log.Println("INFO: Restoring KV snapshot")
|
|
kvReq := c.PostRestoreKV(ctx).ContentType("application/octet-stream").Body(kvBytes)
|
|
if !legacy {
|
|
kvReq = kvReq.ContentEncoding("gzip")
|
|
}
|
|
|
|
// Deal with new token
|
|
newOperatorToken, err := kvReq.Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to restore KV snapshot: %w", err)
|
|
}
|
|
if newOperatorToken.Token != nil {
|
|
newAuthorization := fmt.Sprintf("Token %s", *newOperatorToken.Token)
|
|
const authorizationHeader = "Authorization"
|
|
if newAuthorization != c.ApiConfig.GetConfig().DefaultHeader[authorizationHeader] {
|
|
log.Println("WARN: Restoring KV snapshot overwrote the operator token, ensure following commands use the correct token")
|
|
c.ApiConfig.GetConfig().DefaultHeader[authorizationHeader] = newAuthorization
|
|
}
|
|
}
|
|
|
|
// TODO: Should we have some way of wiping out any existing SQL on the server-side in the case when there is no backup?
|
|
if c.manifest.SQL != nil {
|
|
// NOTE: No logic here to upload non-gzipped data because legacy=true doesn't support SQL restores.
|
|
log.Println("INFO: Restoring SQL snapshot")
|
|
if err := c.PostRestoreSQL(ctx).
|
|
ContentEncoding("gzip").
|
|
ContentType("application/octet-stream").
|
|
Body(sqlBytes).
|
|
Execute(); err != nil {
|
|
return fmt.Errorf("failed to restore SQL snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
// Drill down through bucket manifests to reach shard info, and upload it.
|
|
for _, b := range c.manifest.Buckets {
|
|
for _, rp := range b.RetentionPolicies {
|
|
for _, sg := range rp.ShardGroups {
|
|
for _, s := range sg.Shards {
|
|
if err := c.restoreShard(ctx, path, s, legacy); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// partialRestore creates a bucket (or buckets) on the target server, and seeds it with data
|
|
// from a local backup.
|
|
func (c Client) partialRestore(ctx context.Context, params *Params, legacy bool) (err error) {
|
|
orgIds := map[string]string{}
|
|
|
|
for _, bkt := range c.manifest.Buckets {
|
|
// Skip internal buckets.
|
|
if strings.HasPrefix(bkt.BucketName, "_") {
|
|
continue
|
|
}
|
|
if !params.matches(bkt) {
|
|
continue
|
|
}
|
|
|
|
orgName := bkt.OrganizationName
|
|
// Before this method is called, we ensure that new-org-name is only set if
|
|
// a filter on org-name or org-id is set. If that check passes and execution
|
|
// reaches this code, we can assume that all buckets matching the filter come
|
|
// from the same org, so we can swap in the new org name unconditionally.
|
|
if params.NewOrgName != "" {
|
|
orgName = params.NewOrgName
|
|
}
|
|
|
|
if _, ok := orgIds[orgName]; !ok {
|
|
orgIds[orgName], err = c.restoreOrg(ctx, orgName)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
bkt.OrganizationName = orgName
|
|
bkt.OrganizationID = orgIds[orgName]
|
|
|
|
// By the same reasoning as above, if new-bucket-name is non-empty we know
|
|
// filters must have been set to ensure we only match 1 bucket, so we can
|
|
// swap the name without additional checks.
|
|
if params.NewBucketName != "" {
|
|
bkt.BucketName = params.NewBucketName
|
|
}
|
|
|
|
restoreBucket := c.restoreBucket
|
|
if legacy {
|
|
restoreBucket = c.restoreBucketLegacy
|
|
}
|
|
shardIdMap, err := restoreBucket(ctx, bkt)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to restore bucket %q: %w", bkt.BucketName, err)
|
|
}
|
|
|
|
for _, rp := range bkt.RetentionPolicies {
|
|
for _, sg := range rp.ShardGroups {
|
|
for _, sh := range sg.Shards {
|
|
newID, ok := shardIdMap[sh.ID]
|
|
if !ok {
|
|
log.Printf("WARN: Server didn't map ID for shard %d in bucket %q, skipping\n", sh.ID, bkt.BucketName)
|
|
continue
|
|
}
|
|
sh.ID = newID
|
|
if err := c.restoreShard(ctx, params.Path, sh, legacy); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// restoreBucket creates a new bucket and pre-generates a set of shards within that bucket, returning
|
|
// a mapping between the shard IDs stored in a local backup and the new shard IDs generated on the server.
|
|
func (c Client) restoreBucket(ctx context.Context, bkt br.ManifestBucketEntry) (map[int64]int64, error) {
|
|
log.Printf("INFO: Restoring bucket %q as %q\n", bkt.BucketID, bkt.BucketName)
|
|
bucketMapping, err := c.PostRestoreBucketMetadata(ctx).
|
|
BucketMetadataManifest(ConvertBucketManifest(bkt)).
|
|
Execute()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shardIdMap := make(map[int64]int64, len(bucketMapping.ShardMappings))
|
|
for _, mapping := range bucketMapping.ShardMappings {
|
|
shardIdMap[mapping.OldId] = mapping.NewId
|
|
}
|
|
return shardIdMap, nil
|
|
}
|
|
|
|
// restoreBucketLegacy creates a new bucket and pre-generates a set of shards within that bucket, returning
|
|
// a mapping between the shard IDs stored in a local backup and the new shard IDs generated on the server.
|
|
//
|
|
// The server-side logic to do all this was introduced in v2.1.0. To support using newer CLI versions against
|
|
// v2.0.x of the server, we replicate the logic here via multiple API calls.
|
|
func (c Client) restoreBucketLegacy(ctx context.Context, bkt br.ManifestBucketEntry) (map[int64]int64, error) {
|
|
log.Printf("INFO: Restoring bucket %q as %q using legacy APIs\n", bkt.BucketID, bkt.BucketName)
|
|
// Legacy APIs require creating the bucket as a separate call.
|
|
rps := make([]api.RetentionRule, len(bkt.RetentionPolicies))
|
|
for i, rp := range bkt.RetentionPolicies {
|
|
rps[i] = *api.NewRetentionRuleWithDefaults()
|
|
rps[i].EverySeconds = int64(time.Duration(rp.Duration).Seconds())
|
|
sgd := int64(time.Duration(rp.ShardGroupDuration).Seconds())
|
|
rps[i].ShardGroupDurationSeconds = &sgd
|
|
}
|
|
|
|
bucketReq := *api.NewPostBucketRequest(bkt.OrganizationID, bkt.BucketName)
|
|
bucketReq.RetentionRules = &rps
|
|
bucketReq.Description = bkt.Description
|
|
|
|
newBkt, err := c.PostBuckets(ctx).PostBucketRequest(bucketReq).Execute()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't create bucket %q: %w", bkt.BucketName, err)
|
|
}
|
|
|
|
dbi := bucketToDBI(bkt)
|
|
dbiBytes, err := proto.Marshal(dbi)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
shardMapJSON, err := c.PostRestoreBucketID(ctx, *newBkt.Id).Body(dbiBytes).Execute()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't restore database info for %q: %w", bkt.BucketName, err)
|
|
}
|
|
|
|
var shardMap map[int64]int64
|
|
if err := json.Unmarshal([]byte(shardMapJSON), &shardMap); err != nil {
|
|
return nil, fmt.Errorf("couldn't parse result of restoring database info for %q: %w", bkt.BucketName, err)
|
|
}
|
|
|
|
return shardMap, nil
|
|
}
|
|
|
|
// restoreOrg gets the ID for the org with the given name, creating the org if it doesn't already exist.
|
|
func (c Client) restoreOrg(ctx context.Context, name string) (string, error) {
|
|
// NOTE: Our orgs API returns a 404 instead of an empty list when filtering by a specific name.
|
|
orgs, err := c.GetOrgs(ctx).Org(name).Execute()
|
|
if err != nil {
|
|
if apiErr, ok := err.(api.ApiError); !ok || apiErr.ErrorCode() != api.ERRORCODE_NOT_FOUND {
|
|
return "", fmt.Errorf("failed to check existence of organization %q: %w", name, err)
|
|
}
|
|
}
|
|
|
|
// If we've gotten this far and err != nil, it means err was a 404.
|
|
if err != nil || len(orgs.GetOrgs()) == 0 {
|
|
// Create any missing orgs.
|
|
newOrg, err := c.PostOrgs(ctx).PostOrganizationRequest(api.PostOrganizationRequest{Name: name}).Execute()
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to create organization %q: %w", name, err)
|
|
}
|
|
return *newOrg.Id, nil
|
|
}
|
|
|
|
return *orgs.GetOrgs()[0].Id, nil
|
|
}
|
|
|
|
func bucketToDBI(b br.ManifestBucketEntry) *br.DatabaseInfo {
|
|
dbi := br.DatabaseInfo{
|
|
Name: &b.BucketID,
|
|
DefaultRetentionPolicy: &b.DefaultRetentionPolicy,
|
|
RetentionPolicies: make([]*br.RetentionPolicyInfo, len(b.RetentionPolicies)),
|
|
ContinuousQueries: nil,
|
|
}
|
|
for i, rp := range b.RetentionPolicies {
|
|
converted := retentionPolicyToRPI(rp)
|
|
dbi.RetentionPolicies[i] = converted
|
|
}
|
|
return &dbi
|
|
}
|
|
|
|
func retentionPolicyToRPI(rp br.ManifestRetentionPolicy) *br.RetentionPolicyInfo {
|
|
replicaN := uint32(rp.ReplicaN)
|
|
rpi := br.RetentionPolicyInfo{
|
|
Name: &rp.Name,
|
|
Duration: &rp.Duration,
|
|
ShardGroupDuration: &rp.ShardGroupDuration,
|
|
ReplicaN: &replicaN,
|
|
ShardGroups: make([]*br.ShardGroupInfo, len(rp.ShardGroups)),
|
|
Subscriptions: make([]*br.SubscriptionInfo, len(rp.Subscriptions)),
|
|
}
|
|
for i, sg := range rp.ShardGroups {
|
|
converted := shardGroupToSGI(sg)
|
|
rpi.ShardGroups[i] = converted
|
|
}
|
|
for i, s := range rp.Subscriptions {
|
|
converted := br.SubscriptionInfo{
|
|
Name: &s.Name,
|
|
Mode: &s.Mode,
|
|
Destinations: s.Destinations,
|
|
}
|
|
rpi.Subscriptions[i] = &converted
|
|
}
|
|
return &rpi
|
|
}
|
|
|
|
func shardGroupToSGI(sg br.ManifestShardGroup) *br.ShardGroupInfo {
|
|
id := uint64(sg.ID)
|
|
start := sg.StartTime.UnixNano()
|
|
end := sg.EndTime.UnixNano()
|
|
var deleted, truncated int64
|
|
if sg.DeletedAt != nil {
|
|
deleted = sg.DeletedAt.UnixNano()
|
|
}
|
|
if sg.TruncatedAt != nil {
|
|
truncated = sg.TruncatedAt.UnixNano()
|
|
}
|
|
sgi := br.ShardGroupInfo{
|
|
ID: &id,
|
|
StartTime: &start,
|
|
EndTime: &end,
|
|
DeletedAt: &deleted,
|
|
Shards: make([]*br.ShardInfo, len(sg.Shards)),
|
|
TruncatedAt: &truncated,
|
|
}
|
|
for i, s := range sg.Shards {
|
|
converted := shardToSI(s)
|
|
sgi.Shards[i] = converted
|
|
}
|
|
return &sgi
|
|
}
|
|
|
|
func shardToSI(shard br.ManifestShardEntry) *br.ShardInfo {
|
|
id := uint64(shard.ID)
|
|
si := br.ShardInfo{
|
|
ID: &id,
|
|
Owners: make([]*br.ShardOwner, len(shard.ShardOwners)),
|
|
}
|
|
for i, o := range shard.ShardOwners {
|
|
oid := uint64(o.NodeID)
|
|
converted := br.ShardOwner{NodeID: &oid}
|
|
si.Owners[i] = &converted
|
|
}
|
|
return &si
|
|
}
|
|
|
|
// readFileGzipped opens a local file and returns a reader of its contents,
|
|
// compressed with gzip.
|
|
func readFileGzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) {
|
|
fullPath := filepath.Join(path, file.FileName)
|
|
f, err := os.Open(fullPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if file.Compression == br.GzipCompression {
|
|
return f, nil
|
|
}
|
|
return gzip.NewGzipPipe(f), nil
|
|
}
|
|
|
|
// readFileGunzipped opens a local file and returns a reader of its contents,
|
|
// gunzipping it if it is compressed.
|
|
func readFileGunzipped(path string, file br.ManifestFileEntry) (io.ReadCloser, error) {
|
|
fullPath := filepath.Join(path, file.FileName)
|
|
f, err := os.Open(fullPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if file.Compression == br.NoCompression {
|
|
return f, nil
|
|
}
|
|
reader, err := gzip.NewGunzipReadCloser(f)
|
|
if err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
return reader, nil
|
|
}
|
|
|
|
// restoreShard overwrites the contents of a single shard on the server using TSM stored in a local backup.
|
|
func (c Client) restoreShard(ctx context.Context, path string, m br.ManifestShardEntry, legacy bool) error {
|
|
read := readFileGzipped
|
|
if legacy {
|
|
// The legacy API didn't support gzipped uploads.
|
|
read = readFileGunzipped
|
|
}
|
|
// Make sure we can read the local snapshot.
|
|
tsmBytes, err := read(path, m.ManifestFileEntry)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open local TSM snapshot at %q: %w", filepath.Join(path, m.FileName), err)
|
|
}
|
|
defer tsmBytes.Close()
|
|
|
|
req := c.PostRestoreShardId(ctx, fmt.Sprintf("%d", m.ID)).
|
|
ContentType("application/octet-stream").
|
|
Body(tsmBytes)
|
|
if !legacy {
|
|
req = req.ContentEncoding("gzip")
|
|
}
|
|
log.Printf("INFO: Restoring TSM snapshot for shard %d\n", m.ID)
|
|
if err := req.Execute(); err != nil {
|
|
return fmt.Errorf("failed to restore TSM snapshot for shard %d: %w", m.ID, err)
|
|
}
|
|
return nil
|
|
}
|