
* chore: update openapi * feat: update replication field names to match influx changes * feat: add new field to show replication queue sync progress * chore: update to master openapi
298 lines
8.0 KiB
Go
298 lines
8.0 KiB
Go
package replication
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/influxdata/influx-cli/v2/api"
|
|
"github.com/influxdata/influx-cli/v2/clients"
|
|
)
|
|
|
|
type Client struct {
|
|
clients.CLI
|
|
api.ReplicationsApi
|
|
api.OrganizationsApi
|
|
}
|
|
|
|
type CreateParams struct {
|
|
clients.OrgParams
|
|
Name string
|
|
Description string
|
|
RemoteID string
|
|
LocalBucketID string
|
|
RemoteBucketID string
|
|
RemoteBucketName string
|
|
MaxQueueSize int64
|
|
DropNonRetryableData bool
|
|
NoDropNonRetryableData bool
|
|
MaxAge int64
|
|
}
|
|
|
|
func (c Client) Create(ctx context.Context, params *CreateParams) error {
|
|
if params.RemoteBucketID == "" && params.RemoteBucketName == "" {
|
|
return fmt.Errorf("please supply one of: remote-bucket-id, remote-bucket-name")
|
|
}
|
|
orgID, err := params.GetOrgID(ctx, c.ActiveConfig, c.OrganizationsApi)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set up a struct with required params
|
|
body := api.ReplicationCreationRequest{
|
|
Name: params.Name,
|
|
OrgID: orgID,
|
|
RemoteID: params.RemoteID,
|
|
LocalBucketID: params.LocalBucketID,
|
|
MaxQueueSizeBytes: params.MaxQueueSize,
|
|
MaxAgeSeconds: params.MaxAge,
|
|
}
|
|
|
|
if params.RemoteBucketID != "" {
|
|
body.RemoteBucketID = ¶ms.RemoteBucketID
|
|
} else {
|
|
body.RemoteBucketName = ¶ms.RemoteBucketName
|
|
}
|
|
|
|
// set optional params if specified
|
|
if params.Description != "" {
|
|
body.Description = ¶ms.Description
|
|
}
|
|
|
|
dropNonRetryableDataBoolPtr, err := dropNonRetryableDataBoolPtrFromFlags(params.DropNonRetryableData, params.NoDropNonRetryableData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
body.DropNonRetryableData = dropNonRetryableDataBoolPtr
|
|
|
|
// send post request
|
|
res, err := c.PostReplication(ctx).ReplicationCreationRequest(body).Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create replication stream %q: %w", params.Name, err)
|
|
}
|
|
|
|
// print confirmation of new replication stream
|
|
return c.printReplication(printReplicationOpts{replication: &res})
|
|
}
|
|
|
|
type ListParams struct {
|
|
clients.OrgParams
|
|
Name string
|
|
RemoteID string
|
|
LocalBucketID string
|
|
}
|
|
|
|
func (c Client) List(ctx context.Context, params *ListParams) error {
|
|
orgID, err := params.GetOrgID(ctx, c.ActiveConfig, c.OrganizationsApi)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set up params
|
|
req := c.GetReplications(ctx).OrgID(orgID)
|
|
|
|
if params.Name != "" {
|
|
req = req.Name(params.Name)
|
|
}
|
|
|
|
if params.RemoteID != "" {
|
|
req = req.RemoteID(params.RemoteID)
|
|
}
|
|
|
|
if params.LocalBucketID != "" {
|
|
req = req.LocalBucketID(params.LocalBucketID)
|
|
}
|
|
|
|
// send get request
|
|
res, err := req.Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get replication streams: %w", err)
|
|
}
|
|
|
|
// print replication stream info
|
|
printOpts := printReplicationOpts{}
|
|
if res.Replications != nil {
|
|
printOpts.replications = *res.Replications
|
|
} else {
|
|
return errors.New("no replication streams found for specified parameters")
|
|
}
|
|
|
|
return c.printReplication(printOpts)
|
|
}
|
|
|
|
type UpdateParams struct {
|
|
ReplicationID string
|
|
Name string
|
|
Description string
|
|
RemoteID string
|
|
RemoteBucketID string
|
|
MaxQueueSize int64
|
|
DropNonRetryableData bool
|
|
NoDropNonRetryableData bool
|
|
MaxAge int64
|
|
}
|
|
|
|
func (c Client) Update(ctx context.Context, params *UpdateParams) error {
|
|
// build request
|
|
body := api.ReplicationUpdateRequest{}
|
|
|
|
if params.Name != "" {
|
|
body.SetName(params.Name)
|
|
}
|
|
|
|
if params.Description != "" {
|
|
body.SetDescription(params.Description)
|
|
}
|
|
|
|
if params.RemoteID != "" {
|
|
body.SetRemoteID(params.RemoteID)
|
|
}
|
|
|
|
if params.RemoteBucketID != "" {
|
|
body.SetRemoteBucketID(params.RemoteBucketID)
|
|
}
|
|
|
|
if params.MaxQueueSize != 0 {
|
|
body.SetMaxQueueSizeBytes(params.MaxQueueSize)
|
|
}
|
|
|
|
dropNonRetryableDataBoolPtr, err := dropNonRetryableDataBoolPtrFromFlags(params.DropNonRetryableData, params.NoDropNonRetryableData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if dropNonRetryableDataBoolPtr != nil {
|
|
body.SetDropNonRetryableData(*dropNonRetryableDataBoolPtr)
|
|
}
|
|
|
|
if params.MaxAge != 0 {
|
|
body.SetMaxAgeSeconds(params.MaxAge)
|
|
}
|
|
|
|
// send patch request
|
|
res, err := c.PatchReplicationByID(ctx, params.ReplicationID).ReplicationUpdateRequest(body).Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update replication stream %q: %w", params.ReplicationID, err)
|
|
}
|
|
// print updated replication stream info
|
|
return c.printReplication(printReplicationOpts{replication: &res})
|
|
}
|
|
|
|
func (c Client) Delete(ctx context.Context, replicationID string) error {
|
|
// get replication stream via ID
|
|
connection, err := c.GetReplicationByID(ctx, replicationID).Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("could not find replication stream with ID %q: %w", replicationID, err)
|
|
}
|
|
|
|
// send delete request
|
|
if err := c.DeleteReplicationByID(ctx, replicationID).Execute(); err != nil {
|
|
return fmt.Errorf("failed to delete replication stream %q: %w", replicationID, err)
|
|
}
|
|
|
|
// print deleted replication stream info
|
|
printOpts := printReplicationOpts{
|
|
replication: &connection,
|
|
deleted: true,
|
|
}
|
|
|
|
return c.printReplication(printOpts)
|
|
}
|
|
|
|
func (c Client) DeleteWithRemoteID(ctx context.Context, conn api.RemoteConnection) error {
|
|
reps, err := c.GetReplications(ctx).OrgID(conn.OrgID).RemoteID(conn.Id).Execute()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find replication streams with remote ID %q: %w", conn.Id, err)
|
|
}
|
|
|
|
if reps.Replications != nil {
|
|
for _, rep := range reps.GetReplications() {
|
|
if err := c.DeleteReplicationByID(ctx, rep.Id).Execute(); err != nil {
|
|
return fmt.Errorf("failed to delete replication with ID %q: %w", rep.Id, err)
|
|
}
|
|
}
|
|
} else {
|
|
return fmt.Errorf("no replications found for remote ID %q", conn.Id)
|
|
}
|
|
|
|
printOpts := printReplicationOpts{
|
|
replications: reps.GetReplications(),
|
|
deleted: true,
|
|
}
|
|
|
|
return c.printReplication(printOpts)
|
|
}
|
|
|
|
type printReplicationOpts struct {
|
|
replication *api.Replication
|
|
replications []api.Replication
|
|
deleted bool
|
|
}
|
|
|
|
func (c Client) printReplication(opts printReplicationOpts) error {
|
|
if c.PrintAsJSON {
|
|
var v interface{}
|
|
if opts.replication != nil {
|
|
v = opts.replication
|
|
} else {
|
|
v = opts.replications
|
|
}
|
|
return c.PrintJSON(v)
|
|
}
|
|
|
|
headers := []string{"ID", "Name", "Org ID", "Remote ID", "Local Bucket ID", "Remote Bucket ID", "Remote Bucket Name",
|
|
"Remaining Bytes to be Synced", "Current Queue Bytes on Disk", "Max Queue Bytes", "Latest Status Code", "Drop Non-Retryable Data"}
|
|
if opts.deleted {
|
|
headers = append(headers, "Deleted")
|
|
}
|
|
|
|
if opts.replication != nil {
|
|
opts.replications = append(opts.replications, *opts.replication)
|
|
}
|
|
|
|
var rows []map[string]interface{}
|
|
for _, r := range opts.replications {
|
|
bucketID := r.GetRemoteBucketID()
|
|
if r.GetRemoteBucketName() != "" {
|
|
// This hides the default id that is required due to platform.ID implementation details
|
|
bucketID = ""
|
|
}
|
|
row := map[string]interface{}{
|
|
"ID": r.GetId(),
|
|
"Name": r.GetName(),
|
|
"Org ID": r.GetOrgID(),
|
|
"Remote ID": r.GetRemoteID(),
|
|
"Local Bucket ID": r.GetLocalBucketID(),
|
|
"Remote Bucket ID": bucketID,
|
|
"Remote Bucket Name": r.GetRemoteBucketName(),
|
|
"Remaining Bytes to be Synced": r.GetRemainingBytesToBeSynced(),
|
|
"Current Queue Bytes on Disk": r.GetCurrentQueueSizeBytes(),
|
|
"Max Queue Bytes": r.GetMaxQueueSizeBytes(),
|
|
"Latest Status Code": r.GetLatestResponseCode(),
|
|
"Drop Non-Retryable Data": r.GetDropNonRetryableData(),
|
|
}
|
|
if opts.deleted {
|
|
row["Deleted"] = true
|
|
}
|
|
rows = append(rows, row)
|
|
}
|
|
|
|
return c.PrintTable(headers, rows...)
|
|
}
|
|
|
|
func dropNonRetryableDataBoolPtrFromFlags(dropNonRetryableData, noDropNonRetryableData bool) (*bool, error) {
|
|
if dropNonRetryableData && noDropNonRetryableData {
|
|
return nil, errors.New("cannot specify both --drop-non-retryable-data and --no-drop-non-retryable-data at the same time")
|
|
}
|
|
|
|
if dropNonRetryableData {
|
|
return api.PtrBool(true), nil
|
|
}
|
|
|
|
if noDropNonRetryableData {
|
|
return api.PtrBool(false), nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|