diff --git a/clients/backup/backup.go b/clients/backup/backup.go index a37b7d1..40edc5c 100644 --- a/clients/backup/backup.go +++ b/clients/backup/backup.go @@ -11,6 +11,8 @@ import ( "mime/multipart" "os" "path/filepath" + "regexp" + "strconv" "time" "github.com/influxdata/influx-cli/v2/api" @@ -20,6 +22,7 @@ import ( type Client struct { clients.CLI + api.HealthApi api.BackupApi // Local state tracked across steps in the backup process. @@ -70,9 +73,20 @@ func (c *Client) Backup(ctx context.Context, params *Params) error { } c.baseName = time.Now().UTC().Format(backupFilenamePattern) - if err := c.downloadMetadata(ctx, params); err != nil { + // The APIs we use to back up metadata depends on the server's version. + legacyServer, err := c.serverIsLegacy(ctx) + if err != nil { + return err + } + backupMetadata := c.downloadMetadata + if legacyServer { + backupMetadata = c.downloadMetadataLegacy + } + if err := backupMetadata(ctx, params); err != nil { return fmt.Errorf("failed to backup metadata: %w", err) } + + // Once metadata has been fetched, things are consistent across versions. if err := c.downloadBucketData(ctx, params); err != nil { return fmt.Errorf("failed to backup bucket data: %w", err) } @@ -82,6 +96,40 @@ func (c *Client) Backup(ctx context.Context, params *Params) error { return nil } +var semverRegex = regexp.MustCompile(`(\d+)\.(\d+)\.(\d+).*`) + +// serverIsLegacy checks if the InfluxDB server targeted by the backup is running v2.0.x, +// which used different APIs for backups. +func (c Client) serverIsLegacy(ctx context.Context) (bool, error) { + res, err := c.GetHealth(ctx).Execute() + if err != nil { + return false, fmt.Errorf("API compatibility check failed: %w", err) + } + var version string + if res.Version != nil { + version = *res.Version + } + + matches := semverRegex.FindSubmatch([]byte(version)) + if matches == nil { + // Assume non-semver versions are only reported by nightlies & dev builds, which + // should now support the new APIs. + log.Printf("WARN: Couldn't parse version %q reported by server, assuming latest backup APIs are supported", version) + return false, nil + } + // matches[0] is the entire matched string, capture groups start at 1. + majorStr, minorStr := matches[1], matches[2] + // Ignore the err values here because the regex-match ensures we can parse the captured + // groups as integers. + major, _ := strconv.Atoi(string(majorStr)) + minor, _ := strconv.Atoi(string(minorStr)) + + if major < 2 { + return false, fmt.Errorf("InfluxDB v%d does not support the APIs required for backups", major) + } + return minor == 0, nil +} + // downloadMetadata downloads a snapshot of the KV store, SQL DB, and bucket // manifests from the server. KV and SQL are written to local files. Bucket manifests // are parsed into a slice for additional processing. @@ -91,6 +139,7 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { if err != nil { return fmt.Errorf("failed to download metadata snapshot: %w", err) } + defer rawResp.Body.Close() kvName := fmt.Sprintf("%s.bolt", c.baseName) sqlName := fmt.Sprintf("%s.sqlite", c.baseName) @@ -169,7 +218,7 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { if err != nil { return fmt.Errorf("failed to save local copy of SQL backup to %q: %w", sqlName, err) } - c.manifest.SQL = fi + c.manifest.SQL = &fi case "buckets": if err := json.NewDecoder(part).Decode(&c.bucketMetadata); err != nil { return fmt.Errorf("failed to decode bucket manifest from backup: %w", err) @@ -181,6 +230,85 @@ func (c *Client) downloadMetadata(ctx context.Context, params *Params) error { return nil } +// downloadMetadataLegacy downloads a snapshot of the KV store from the server, and extracts +// a bucket manifest from the result. KV is written to a local file; the extracted manifest +// is tracked as a slice for additional processing. +// +// NOTE: This should _not_ be used against an InfluxDB instance running v2.1.0 or later, as +// it will fail to capture metadata stored in SQL. +func (c *Client) downloadMetadataLegacy(ctx context.Context, params *Params) error { + log.Println("INFO: Downloading legacy KV snapshot") + rawResp, err := c.GetBackupKV(ctx).Execute() + if err != nil { + return fmt.Errorf("failed to download KV snapshot: %w", err) + } + defer rawResp.Body.Close() + + kvName := filepath.Join(params.Path, fmt.Sprintf("%s.bolt", c.baseName)) + tmpKv := fmt.Sprintf("%s.tmp", kvName) + defer os.RemoveAll(tmpKv) + + // Since we need to read the bolt DB to extract a manifest, always save it uncompressed locally. + if err := func() error { + f, err := os.Create(tmpKv) + if err != nil { + return err + } + defer f.Close() + + _, err = io.Copy(f, rawResp.Body) + return err + }(); err != nil { + return fmt.Errorf("failed to save downloaded KV snapshot: %w", err) + } + + // Extract the metadata we need from the downloaded KV store, and convert it to a new-style manifest. + log.Println("INFO: Extracting bucket manifest from legacy KV snapshot") + c.bucketMetadata, err = extractBucketManifest(tmpKv) + if err != nil { + return fmt.Errorf("failed to extract bucket metadata from downloaded KV snapshot: %w", err) + } + + // Move/compress the bolt DB into its final location. + if err := func() error { + if params.Compression == br.NoCompression { + return os.Rename(tmpKv, kvName) + } + + tmpIn, err := os.Open(tmpKv) + if err != nil { + return err + } + defer tmpIn.Close() + + kvName = kvName + ".gz" + out, err := os.Create(kvName) + if err != nil { + return err + } + defer out.Close() + + gzw := gzip.NewWriter(out) + defer gzw.Close() + + _, err = io.Copy(gzw, tmpIn) + return err + }(); err != nil { + return fmt.Errorf("failed to rename downloaded KV snapshot: %w", err) + } + + fi, err := os.Stat(kvName) + if err != nil { + return fmt.Errorf("failed to inspect local KV snapshot: %w", err) + } + c.manifest.KV = br.ManifestFileEntry{ + FileName: fi.Name(), + Size: fi.Size(), + Compression: params.Compression, + } + return nil +} + // downloadBucketData downloads TSM snapshots for each shard in the buckets matching // the filter parameters provided over the CLI. Snapshots are written to local files. // diff --git a/clients/backup/backup_internal_test.go b/clients/backup/backup_internal_test.go index 643a2fa..2269163 100644 --- a/clients/backup/backup_internal_test.go +++ b/clients/backup/backup_internal_test.go @@ -337,3 +337,61 @@ func (e *notFoundErr) Error() string { func (e *notFoundErr) ErrorCode() api.ErrorCode { return api.ERRORCODE_NOT_FOUND } + +func TestBackup_ServerIsLegacy(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + versionStr *string + legacy bool + wantErr string + }{ + { + name: "2.0.x", + versionStr: api.PtrString("2.0.7"), + legacy: true, + }, + { + name: "2.1.x", + versionStr: api.PtrString("2.1.0-RC1"), + }, + { + name: "nightly", + versionStr: api.PtrString("nightly-2020-01-01"), + }, + { + name: "dev", + versionStr: api.PtrString("some.custom-version.2"), + }, + { + name: "1.x", + versionStr: api.PtrString("1.9.3"), + wantErr: "InfluxDB v1 does not support the APIs", + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + healthApi := mock.NewMockHealthApi(ctrl) + healthApi.EXPECT().GetHealth(gomock.Any()).Return(api.ApiGetHealthRequest{ApiService: healthApi}) + healthApi.EXPECT().GetHealthExecute(gomock.Any()).Return(api.HealthCheck{Version: tc.versionStr}, nil) + + client := Client{HealthApi: healthApi} + isLegacy, err := client.serverIsLegacy(context.Background()) + + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.wantErr) + return + } + + require.NoError(t, err) + require.Equal(t, tc.legacy, isLegacy) + }) + } +} diff --git a/clients/restore/restore.go b/clients/restore/restore.go index 51f1b56..3d9d349 100644 --- a/clients/restore/restore.go +++ b/clients/restore/restore.go @@ -131,11 +131,15 @@ func (c Client) fullRestore(ctx context.Context, path string) error { return fmt.Errorf("failed to open local KV backup at %q: %w", filepath.Join(path, c.manifest.KV.FileName), err) } defer kvBytes.Close() - sqlBytes, err := c.readFileGzipped(path, c.manifest.SQL) - if err != nil { - return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err) + + var sqlBytes io.ReadCloser + if c.manifest.SQL != nil { + sqlBytes, err = c.readFileGzipped(path, *c.manifest.SQL) + if err != nil { + return fmt.Errorf("failed to open local SQL backup at %q: %w", filepath.Join(path, c.manifest.SQL.FileName), err) + } + defer sqlBytes.Close() } - defer sqlBytes.Close() // Upload metadata snapshots to the server. log.Println("INFO: Restoring KV snapshot") @@ -146,13 +150,17 @@ func (c Client) fullRestore(ctx context.Context, path string) error { Execute(); err != nil { return fmt.Errorf("failed to restore KV snapshot: %w", err) } - log.Println("INFO: Restoring SQL snapshot") - if err := c.PostRestoreSQL(ctx). - ContentEncoding("gzip"). - ContentType("application/octet-stream"). - Body(sqlBytes). - Execute(); err != nil { - return fmt.Errorf("failed to restore SQL snapshot: %w", err) + + // TODO: Should we have some way of wiping out any existing SQL on the server-side in the case when there is no backup? + if c.manifest.SQL != nil { + log.Println("INFO: Restoring SQL snapshot") + if err := c.PostRestoreSQL(ctx). + ContentEncoding("gzip"). + ContentType("application/octet-stream"). + Body(sqlBytes). + Execute(); err != nil { + return fmt.Errorf("failed to restore SQL snapshot: %w", err) + } } // Drill down through bucket manifests to reach shard info, and upload it. diff --git a/cmd/influx/backup.go b/cmd/influx/backup.go index e54870f..0c2fbca 100644 --- a/cmd/influx/backup.go +++ b/cmd/influx/backup.go @@ -61,9 +61,11 @@ Examples: } params.Path = ctx.Args().Get(0) + api := getAPI(ctx) client := backup.Client{ CLI: getCLI(ctx), - BackupApi: getAPI(ctx).BackupApi.OnlyOSS(), + BackupApi: api.BackupApi.OnlyOSS(), + HealthApi: api.HealthApi.OnlyOSS(), } return client.Backup(getContext(ctx), ¶ms) }, diff --git a/internal/backup_restore/manifest.go b/internal/backup_restore/manifest.go index 876e545..d753c1d 100644 --- a/internal/backup_restore/manifest.go +++ b/internal/backup_restore/manifest.go @@ -39,7 +39,7 @@ const ManifestExtension = "manifest" type Manifest struct { KV ManifestFileEntry `json:"kv"` - SQL ManifestFileEntry `json:"sql"` + SQL *ManifestFileEntry `json:"sql,omitempty"` Buckets []ManifestBucketEntry `json:"buckets"` }