refactor: Move from global state to functions (#53)
This commit represents a few experiments of features I've used in Cobra 1. Uses cli.GenericFlag to encapsulate parsing and validation of flag values at parse time. This removes the burden from the individual CLI commands to parse and validate args and options. 2. Add influxid.ID that may be used by any flag that requires an Influx ID. influxid.ID parses and validates string value is a valid ID, removing this burden from individual commands and ensuring valid values before the command actions begins. 3. Binds cli.Flags directly to params structures to directly capture the values when parsing flags. 4. Moves from global state to local builder functions for the majority of the commands. This allows the commands to bind to flag variables reducing the repeated ctx.String(), ctx.Int(), etc 5. Leverages the BeforeFunc to create middleware and inject the CLI and API client into commands, saving the repeated boilerplate across all of the instantiated commands. This is extensible, so additional middleware can be appends using the middleware.WithBeforeFns
This commit is contained in:
parent
0b4d753728
commit
3414e1a983
@ -2,215 +2,227 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influx-cli/v2/internal"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var bucketCmd = cli.Command{
|
||||
Name: "bucket",
|
||||
Usage: "Bucket management commands",
|
||||
Subcommands: []*cli.Command{
|
||||
{
|
||||
Name: "create",
|
||||
Usage: "Create bucket",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "New bucket name",
|
||||
Aliases: []string{"n"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "description",
|
||||
Usage: "Description of the bucket that will be created",
|
||||
Aliases: []string{"d"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "Duration bucket will retain data, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
DefaultText: "infinite",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "shard-group-duration",
|
||||
Usage: "Shard group duration used internally by the storage engine",
|
||||
DefaultText: "calculated from retention",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
clients := internal.BucketsClients{
|
||||
BucketApi: client.BucketsApi,
|
||||
OrgApi: client.OrganizationsApi,
|
||||
}
|
||||
return cli.BucketsCreate(standardCtx(ctx), &clients, &internal.BucketsCreateParams{
|
||||
OrgID: ctx.String("org-id"),
|
||||
OrgName: ctx.String("org"),
|
||||
Name: ctx.String("name"),
|
||||
Description: ctx.String("description"),
|
||||
Retention: ctx.String("retention"),
|
||||
ShardGroupDuration: ctx.String("shard-group-duration"),
|
||||
})
|
||||
},
|
||||
func withBucketsClient() cli.BeforeFunc {
|
||||
return middleware.WithBeforeFns(
|
||||
withCli(),
|
||||
withApi(true),
|
||||
func(ctx *cli.Context) error {
|
||||
client := getAPI(ctx)
|
||||
ctx.App.Metadata["bucketsClient"] = internal.BucketsClients{
|
||||
BucketApi: client.BucketsApi,
|
||||
OrgApi: client.OrganizationsApi,
|
||||
}
|
||||
return nil
|
||||
},
|
||||
{
|
||||
Name: "delete",
|
||||
Usage: "Delete bucket",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID, required if name isn't provided",
|
||||
Aliases: []string{"i"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "The bucket name, org or org-id will be required by choosing this",
|
||||
Aliases: []string{"n"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cli.BucketsDelete(standardCtx(ctx), client.BucketsApi, &internal.BucketsDeleteParams{
|
||||
ID: ctx.String("id"),
|
||||
Name: ctx.String("name"),
|
||||
OrgID: ctx.String("org-id"),
|
||||
OrgName: ctx.String("org"),
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "list",
|
||||
Usage: "List buckets",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "The bucket name",
|
||||
Aliases: []string{"n"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID",
|
||||
Aliases: []string{"i"},
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cli.BucketsList(standardCtx(ctx), client.BucketsApi, &internal.BucketsListParams{
|
||||
ID: ctx.String("id"),
|
||||
Name: ctx.String("name"),
|
||||
OrgID: ctx.String("org-id"),
|
||||
OrgName: ctx.String("org"),
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "update",
|
||||
Usage: "Update bucket",
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "New name to set on the bucket",
|
||||
Aliases: []string{"n"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID",
|
||||
Aliases: []string{"i"},
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "description",
|
||||
Usage: "New description to set on the bucket",
|
||||
Aliases: []string{"d"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "New retention duration to set on the bucket, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "shard-group-duration",
|
||||
Usage: "New shard group duration to set on the bucket, or 0 to have the server calculate a value",
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cli.BucketsUpdate(standardCtx(ctx), client.BucketsApi, &internal.BucketsUpdateParams{
|
||||
ID: ctx.String("id"),
|
||||
Name: ctx.String("name"),
|
||||
Description: ctx.String("description"),
|
||||
Retention: ctx.String("retention"),
|
||||
ShardGroupDuration: ctx.String("shard-group-duration"),
|
||||
})
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func getBucketsClient(ctx *cli.Context) internal.BucketsClients {
|
||||
i, ok := ctx.App.Metadata["bucketsClient"].(internal.BucketsClients)
|
||||
if !ok {
|
||||
panic("missing buckets client")
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
||||
func newBucketCmd() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "bucket",
|
||||
Usage: "Bucket management commands",
|
||||
Subcommands: []*cli.Command{
|
||||
newBucketCreateCmd(),
|
||||
newBucketDeleteCmd(),
|
||||
newBucketListCmd(),
|
||||
newBucketUpdateCmd(),
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newBucketCreateCmd() *cli.Command {
|
||||
var params internal.BucketsCreateParams
|
||||
return &cli.Command{
|
||||
Name: "create",
|
||||
Usage: "Create bucket",
|
||||
Before: withBucketsClient(),
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "New bucket name",
|
||||
Aliases: []string{"n"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
Destination: ¶ms.Name,
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "description",
|
||||
Usage: "Description of the bucket that will be created",
|
||||
Aliases: []string{"d"},
|
||||
Destination: ¶ms.Description,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "Duration bucket will retain data, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
DefaultText: "infinite",
|
||||
Destination: ¶ms.Retention,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "shard-group-duration",
|
||||
Usage: "Shard group duration used internally by the storage engine",
|
||||
DefaultText: "calculated from retention",
|
||||
Destination: ¶ms.ShardGroupDuration,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
Destination: ¶ms.OrgID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
Destination: ¶ms.OrgName,
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
clients := getBucketsClient(ctx)
|
||||
return getCLI(ctx).BucketsCreate(ctx.Context, &clients, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newBucketDeleteCmd() *cli.Command {
|
||||
var params internal.BucketsDeleteParams
|
||||
return &cli.Command{
|
||||
Name: "delete",
|
||||
Usage: "Delete bucket",
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID, required if name isn't provided",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.ID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "The bucket name, org or org-id will be required by choosing this",
|
||||
Aliases: []string{"n"},
|
||||
Destination: ¶ms.Name,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
Destination: ¶ms.OrgID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
Destination: ¶ms.OrgName,
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
return getCLI(ctx).BucketsDelete(ctx.Context, getAPI(ctx).BucketsApi, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newBucketListCmd() *cli.Command {
|
||||
var params internal.BucketsListParams
|
||||
return &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List buckets",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID, required if name isn't provided",
|
||||
Aliases: []string{"i"},
|
||||
Destination: ¶ms.ID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "The bucket name, org or org-id will be required by choosing this",
|
||||
Aliases: []string{"n"},
|
||||
Destination: ¶ms.Name,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
Destination: ¶ms.OrgID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
Destination: ¶ms.OrgName,
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
return getCLI(ctx).BucketsList(ctx.Context, getAPI(ctx).BucketsApi, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newBucketUpdateCmd() *cli.Command {
|
||||
var params internal.BucketsUpdateParams
|
||||
return &cli.Command{
|
||||
Name: "update",
|
||||
Usage: "Update bucket",
|
||||
Aliases: []string{"find", "ls"},
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Flags: append(
|
||||
commonFlags,
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "New name to set on the bucket",
|
||||
Aliases: []string{"n"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
Destination: ¶ms.Name,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "The bucket ID",
|
||||
Aliases: []string{"i"},
|
||||
Required: true,
|
||||
Destination: ¶ms.ID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "description",
|
||||
Usage: "New description to set on the bucket",
|
||||
Aliases: []string{"d"},
|
||||
Destination: ¶ms.Description,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "New retention duration to set on the bucket, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
Destination: ¶ms.Retention,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "shard-group-duration",
|
||||
Usage: "New shard group duration to set on the bucket, or 0 to have the server calculate a value",
|
||||
Destination: ¶ms.ShardGroupDuration,
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
return getCLI(ctx).BucketsUpdate(ctx.Context, getAPI(ctx).BucketsApi, ¶ms)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/influxdata/influx-cli/v2/internal/api"
|
||||
"github.com/influxdata/influx-cli/v2/internal/config"
|
||||
"github.com/influxdata/influx-cli/v2/internal/stdio"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/signals"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
@ -185,26 +186,75 @@ func newApiClient(ctx *cli.Context, cli *internal.CLI, injectToken bool) (*api.A
|
||||
return api.NewAPIClient(apiConfig), nil
|
||||
}
|
||||
|
||||
// standardCtx returns a context that will cancel on SIGINT and SIGTERM.
|
||||
func standardCtx(ctx *cli.Context) context.Context {
|
||||
return signals.WithStandardSignals(ctx.Context)
|
||||
}
|
||||
|
||||
var app = cli.App{
|
||||
Name: "influx",
|
||||
Usage: "Influx Client",
|
||||
UsageText: "influx [command]",
|
||||
Commands: []*cli.Command{
|
||||
&versionCmd,
|
||||
&pingCmd,
|
||||
&setupCmd,
|
||||
&writeCmd,
|
||||
&bucketCmd,
|
||||
newVersionCmd(),
|
||||
newPingCmd(),
|
||||
newSetupCmd(),
|
||||
newWriteCmd(),
|
||||
newBucketCmd(),
|
||||
},
|
||||
}
|
||||
|
||||
func withCli() cli.BeforeFunc {
|
||||
return func(ctx *cli.Context) error {
|
||||
c, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.App.Metadata["cli"] = c
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func getCLI(ctx *cli.Context) *internal.CLI {
|
||||
i, ok := ctx.App.Metadata["cli"].(*internal.CLI)
|
||||
if !ok {
|
||||
panic("missing CLI")
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
||||
func withApi(injectToken bool) cli.BeforeFunc {
|
||||
key := "api-no-token"
|
||||
if injectToken {
|
||||
key = "api"
|
||||
}
|
||||
|
||||
makeFn := func(ctx *cli.Context) error {
|
||||
c := getCLI(ctx)
|
||||
apiClient, err := newApiClient(ctx, c, injectToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx.App.Metadata[key] = apiClient
|
||||
return nil
|
||||
}
|
||||
return middleware.WithBeforeFns(makeFn)
|
||||
}
|
||||
|
||||
func getAPI(ctx *cli.Context) *api.APIClient {
|
||||
i, ok := ctx.App.Metadata["api"].(*api.APIClient)
|
||||
if !ok {
|
||||
panic("missing APIClient with token")
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
||||
func getAPINoToken(ctx *cli.Context) *api.APIClient {
|
||||
i, ok := ctx.App.Metadata["api-no-token"].(*api.APIClient)
|
||||
if !ok {
|
||||
panic("missing APIClient without token")
|
||||
}
|
||||
return i
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
ctx := signals.WithStandardSignals(context.Background())
|
||||
if err := app.RunContext(ctx, os.Args); err != nil {
|
||||
_, _ = fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -1,20 +1,18 @@
|
||||
package main
|
||||
|
||||
import "github.com/urfave/cli/v2"
|
||||
import (
|
||||
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var pingCmd = cli.Command{
|
||||
Name: "ping",
|
||||
Usage: "Check the InfluxDB /health endpoint",
|
||||
Flags: coreFlags,
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cli.Ping(standardCtx(ctx), client.HealthApi)
|
||||
},
|
||||
func newPingCmd() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "ping",
|
||||
Usage: "Check the InfluxDB /health endpoint",
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(false)),
|
||||
Flags: coreFlags,
|
||||
Action: func(ctx *cli.Context) error {
|
||||
return getCLI(ctx).Ping(ctx.Context, getAPINoToken(ctx).HealthApi)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -2,76 +2,72 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influx-cli/v2/internal"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var setupCmd = cli.Command{
|
||||
Name: "setup",
|
||||
Usage: "Setup instance with initial user, org, bucket",
|
||||
Flags: append(
|
||||
commonFlagsNoToken,
|
||||
&cli.StringFlag{
|
||||
Name: "username",
|
||||
Usage: "Name of initial user to create",
|
||||
Aliases: []string{"u"},
|
||||
func newSetupCmd() *cli.Command {
|
||||
var params internal.SetupParams
|
||||
return &cli.Command{
|
||||
Name: "setup",
|
||||
Usage: "Setup instance with initial user, org, bucket",
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(false)),
|
||||
Flags: append(
|
||||
commonFlagsNoToken,
|
||||
&cli.StringFlag{
|
||||
Name: "username",
|
||||
Usage: "Name of initial user to create",
|
||||
Aliases: []string{"u"},
|
||||
Destination: ¶ms.Username,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "password",
|
||||
Usage: "Password to set on initial user",
|
||||
Aliases: []string{"p"},
|
||||
Destination: ¶ms.Password,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: tokenFlag,
|
||||
Usage: "Auth token to set on the initial user",
|
||||
Aliases: []string{"t"},
|
||||
EnvVars: []string{"INFLUX_TOKEN"},
|
||||
DefaultText: "auto-generated",
|
||||
Destination: ¶ms.AuthToken,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "Name of initial organization to create",
|
||||
Aliases: []string{"o"},
|
||||
Destination: ¶ms.Org,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "bucket",
|
||||
Usage: "Name of initial bucket to create",
|
||||
Aliases: []string{"b"},
|
||||
Destination: ¶ms.Bucket,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "Duration initial bucket will retain data, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
DefaultText: "infinite",
|
||||
Destination: ¶ms.Retention,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "force",
|
||||
Usage: "Skip confirmation prompt",
|
||||
Aliases: []string{"f"},
|
||||
Destination: ¶ms.Force,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "Name to set on CLI config generated for the InfluxDB instance, required if other configs exist",
|
||||
Aliases: []string{"n"},
|
||||
Destination: ¶ms.ConfigName,
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
return getCLI(ctx).Setup(ctx.Context, getAPINoToken(ctx).SetupApi, ¶ms)
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "password",
|
||||
Usage: "Password to set on initial user",
|
||||
Aliases: []string{"p"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: tokenFlag,
|
||||
Usage: "Auth token to set on the initial user",
|
||||
Aliases: []string{"t"},
|
||||
EnvVars: []string{"INFLUX_TOKEN"},
|
||||
DefaultText: "auto-generated",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "Name of initial organization to create",
|
||||
Aliases: []string{"o"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "bucket",
|
||||
Usage: "Name of initial bucket to create",
|
||||
Aliases: []string{"b"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "retention",
|
||||
Usage: "Duration initial bucket will retain data, or 0 for infinite",
|
||||
Aliases: []string{"r"},
|
||||
DefaultText: "infinite",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "force",
|
||||
Usage: "Skip confirmation prompt",
|
||||
Aliases: []string{"f"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "name",
|
||||
Usage: "Name to set on CLI config generated for the InfluxDB instance, required if other configs exist",
|
||||
Aliases: []string{"n"},
|
||||
},
|
||||
),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return cli.Setup(standardCtx(ctx), client.SetupApi, &internal.SetupParams{
|
||||
Username: ctx.String("username"),
|
||||
Password: ctx.String("password"),
|
||||
AuthToken: ctx.String(tokenFlag),
|
||||
Org: ctx.String("org"),
|
||||
Bucket: ctx.String("bucket"),
|
||||
Retention: ctx.String("retention"),
|
||||
Force: ctx.Bool("force"),
|
||||
ConfigName: ctx.String("name"),
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -6,11 +6,13 @@ import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var versionCmd = cli.Command{
|
||||
Name: "version",
|
||||
Usage: "Print the influx CLI version",
|
||||
Action: func(*cli.Context) error {
|
||||
fmt.Printf("Influx CLI %s (git: %s) build_date: %s\n", version, commit, date)
|
||||
return nil
|
||||
},
|
||||
func newVersionCmd() *cli.Command {
|
||||
return &cli.Command{
|
||||
Name: "version",
|
||||
Usage: "Print the influx CLI version",
|
||||
Action: func(*cli.Context) error {
|
||||
fmt.Printf("Influx CLI %s (git: %s) build_date: %s\n", version, commit, date)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -11,271 +11,236 @@ import (
|
||||
"github.com/influxdata/influx-cli/v2/internal/batcher"
|
||||
"github.com/influxdata/influx-cli/v2/internal/linereader"
|
||||
"github.com/influxdata/influx-cli/v2/internal/throttler"
|
||||
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var writeFlags = append(
|
||||
commonFlagsNoPrint,
|
||||
&cli.StringFlag{
|
||||
Name: "bucket-id",
|
||||
Usage: "The ID of destination bucket",
|
||||
EnvVars: []string{"INFLUX_BUCKET_ID"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "bucket",
|
||||
Usage: "The name of destination bucket",
|
||||
Aliases: []string{"b"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "precision",
|
||||
Usage: "Precision of the timestamps of the lines",
|
||||
Aliases: []string{"p"},
|
||||
EnvVars: []string{"INFLUX_PRECISION"},
|
||||
Value: "ns",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "format",
|
||||
Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)",
|
||||
DefaultText: "'lp' unless '.csv' extension",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "header",
|
||||
Usage: "Header prepends lines to input data",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "file",
|
||||
Usage: "The path to the file to import",
|
||||
Aliases: []string{"f"},
|
||||
TakesFile: true,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "url",
|
||||
Usage: "The URL to import data from",
|
||||
Aliases: []string{"u"},
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "debug",
|
||||
Usage: "Log CSV columns to stderr before reading data rows",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "skipRowOnError",
|
||||
Usage: "Log CSV data errors to stderr and continue with CSV processing",
|
||||
},
|
||||
// NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being
|
||||
// short-hand for N=1. urfave/cli isn't that flexible.
|
||||
&cli.IntFlag{
|
||||
Name: "skipHeader",
|
||||
Usage: "Skip the first <n> rows from input data",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "max-line-length",
|
||||
Usage: "Specifies the maximum number of bytes that can be read for a single line",
|
||||
Value: 16_000_000,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "xIgnoreDataTypeInColumnName",
|
||||
Usage: "Ignores dataType which could be specified after ':' in column name",
|
||||
Hidden: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "encoding",
|
||||
Usage: "Character encoding of input files or stdin",
|
||||
Value: "UTF-8",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "errors-file",
|
||||
Usage: "The path to the file to write rejected rows to",
|
||||
TakesFile: true,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "rate-limit",
|
||||
Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`,
|
||||
DefaultText: "no throttling",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "compression",
|
||||
Usage: "Input compression, either 'none' or 'gzip'",
|
||||
DefaultText: "'none' unless an input has a '.gz' extension",
|
||||
},
|
||||
)
|
||||
type writeParams struct {
|
||||
Files cli.StringSlice
|
||||
URLs cli.StringSlice
|
||||
Format linereader.InputFormat
|
||||
Compression linereader.InputCompression
|
||||
Encoding string
|
||||
|
||||
var writeCmd = cli.Command{
|
||||
Name: "write",
|
||||
Usage: "Write points to InfluxDB",
|
||||
Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag",
|
||||
Flags: writeFlags,
|
||||
Action: func(ctx *cli.Context) error {
|
||||
format, err := parseFormat(ctx.String("format"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
compression, err := parseCompression(ctx.String("compression"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
precision, err := parsePrecision(ctx.String("precision"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// CSV-specific options.
|
||||
Headers cli.StringSlice
|
||||
SkipRowOnError bool
|
||||
SkipHeader int
|
||||
IgnoreDataTypeInColumnName bool
|
||||
Debug bool
|
||||
|
||||
var errorOut io.Writer
|
||||
if ctx.IsSet("errors-file") {
|
||||
errorFile, err := os.Open(ctx.String("errors-file"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open errors-file: %w", err)
|
||||
}
|
||||
defer errorFile.Close()
|
||||
errorOut = errorFile
|
||||
}
|
||||
ErrorsFile string
|
||||
MaxLineLength int
|
||||
RateLimit throttler.BytesPerSec
|
||||
|
||||
throttler, err := throttler.NewThrottler(ctx.String("rate-limit"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client, err := newApiClient(ctx, cli, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
writeClients := &internal.WriteClients{
|
||||
Client: client.WriteApi,
|
||||
Reader: &linereader.MultiInputLineReader{
|
||||
StdIn: os.Stdin,
|
||||
HttpClient: http.DefaultClient,
|
||||
ErrorOut: errorOut,
|
||||
Args: ctx.Args().Slice(),
|
||||
Files: ctx.StringSlice("file"),
|
||||
URLs: ctx.StringSlice("url"),
|
||||
Format: format,
|
||||
Compression: compression,
|
||||
Encoding: ctx.String("encoding"),
|
||||
Headers: ctx.StringSlice("header"),
|
||||
SkipRowOnError: ctx.Bool("skipRowOnError"),
|
||||
SkipHeader: ctx.Int("skipHeader"),
|
||||
IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"),
|
||||
Debug: ctx.Bool("debug"),
|
||||
},
|
||||
Throttler: throttler,
|
||||
Writer: &batcher.BufferBatcher{
|
||||
MaxFlushBytes: batcher.DefaultMaxBytes,
|
||||
MaxFlushInterval: batcher.DefaultInterval,
|
||||
MaxLineLength: ctx.Int("max-line-length"),
|
||||
},
|
||||
}
|
||||
internal.WriteParams
|
||||
}
|
||||
|
||||
return cli.Write(standardCtx(ctx), writeClients, &internal.WriteParams{
|
||||
BucketID: ctx.String("bucket-id"),
|
||||
BucketName: ctx.String("bucket"),
|
||||
OrgID: ctx.String("org-id"),
|
||||
OrgName: ctx.String("org"),
|
||||
Precision: precision,
|
||||
})
|
||||
},
|
||||
Subcommands: []*cli.Command{
|
||||
{
|
||||
Name: "dryrun",
|
||||
Usage: "Write to stdout instead of InfluxDB",
|
||||
Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol",
|
||||
Flags: writeFlags,
|
||||
Action: func(ctx *cli.Context) error {
|
||||
format, err := parseFormat(ctx.String("format"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
compression, err := parseCompression(ctx.String("compression"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (p *writeParams) makeLineReader(args []string, errorOut io.Writer) *linereader.MultiInputLineReader {
|
||||
return &linereader.MultiInputLineReader{
|
||||
StdIn: os.Stdin,
|
||||
HttpClient: http.DefaultClient,
|
||||
ErrorOut: errorOut,
|
||||
Args: args,
|
||||
Files: p.Files.Value(),
|
||||
URLs: p.URLs.Value(),
|
||||
Format: p.Format,
|
||||
Compression: p.Compression,
|
||||
Encoding: p.Encoding,
|
||||
Headers: p.Headers.Value(),
|
||||
SkipRowOnError: p.SkipRowOnError,
|
||||
SkipHeader: p.SkipHeader,
|
||||
IgnoreDataTypeInColumnName: p.IgnoreDataTypeInColumnName,
|
||||
Debug: p.Debug,
|
||||
}
|
||||
}
|
||||
|
||||
var errorOut io.Writer
|
||||
if ctx.IsSet("errors-file") {
|
||||
errorFile, err := os.Open(ctx.String("errors-file"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open errors-file: %w", err)
|
||||
}
|
||||
defer errorFile.Close()
|
||||
errorOut = errorFile
|
||||
}
|
||||
func (p *writeParams) makeErrorFile() (*os.File, error) {
|
||||
if p.ErrorsFile == "" {
|
||||
return nil, nil
|
||||
}
|
||||
errorFile, err := os.Open(p.ErrorsFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open errors-file: %w", err)
|
||||
}
|
||||
return errorFile, nil
|
||||
}
|
||||
|
||||
cli, err := newCli(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reader := &linereader.MultiInputLineReader{
|
||||
StdIn: os.Stdin,
|
||||
HttpClient: http.DefaultClient,
|
||||
ErrorOut: errorOut,
|
||||
Args: ctx.Args().Slice(),
|
||||
Files: ctx.StringSlice("file"),
|
||||
URLs: ctx.StringSlice("url"),
|
||||
Format: format,
|
||||
Compression: compression,
|
||||
Encoding: ctx.String("encoding"),
|
||||
Headers: ctx.StringSlice("header"),
|
||||
SkipRowOnError: ctx.Bool("skipRowOnError"),
|
||||
SkipHeader: ctx.Int("skipHeader"),
|
||||
IgnoreDataTypeInColumnName: ctx.Bool("xIgnoreDataTypeInColumnName"),
|
||||
Debug: ctx.Bool("debug"),
|
||||
}
|
||||
|
||||
return cli.WriteDryRun(standardCtx(ctx), reader)
|
||||
},
|
||||
func (p *writeParams) Flags() []cli.Flag {
|
||||
return []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "bucket-id",
|
||||
Usage: "The ID of destination bucket",
|
||||
EnvVars: []string{"INFLUX_BUCKET_ID"},
|
||||
Destination: &p.BucketID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "bucket",
|
||||
Usage: "The name of destination bucket",
|
||||
Aliases: []string{"b"},
|
||||
EnvVars: []string{"INFLUX_BUCKET_NAME"},
|
||||
Destination: &p.BucketName,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org-id",
|
||||
Usage: "The ID of the organization",
|
||||
EnvVars: []string{"INFLUX_ORG_ID"},
|
||||
Destination: &p.OrgID,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "org",
|
||||
Usage: "The name of the organization",
|
||||
Aliases: []string{"o"},
|
||||
EnvVars: []string{"INFLUX_ORG"},
|
||||
Destination: &p.OrgName,
|
||||
},
|
||||
&cli.GenericFlag{
|
||||
Name: "precision",
|
||||
Usage: "Precision of the timestamps of the lines",
|
||||
Aliases: []string{"p"},
|
||||
EnvVars: []string{"INFLUX_PRECISION"},
|
||||
Value: &p.Precision,
|
||||
},
|
||||
&cli.GenericFlag{
|
||||
Name: "format",
|
||||
Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)",
|
||||
DefaultText: "'lp' unless '.csv' extension",
|
||||
Value: &p.Format,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "header",
|
||||
Usage: "Header prepends lines to input data",
|
||||
Destination: &p.Headers,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "file",
|
||||
Usage: "The path to the file to import",
|
||||
Aliases: []string{"f"},
|
||||
TakesFile: true,
|
||||
Destination: &p.Files,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "url",
|
||||
Usage: "The URL to import data from",
|
||||
Aliases: []string{"u"},
|
||||
Destination: &p.URLs,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "debug",
|
||||
Usage: "Log CSV columns to stderr before reading data rows",
|
||||
Destination: &p.Debug,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "skipRowOnError",
|
||||
Usage: "Log CSV data errors to stderr and continue with CSV processing",
|
||||
Destination: &p.SkipRowOnError,
|
||||
},
|
||||
// NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being
|
||||
// short-hand for N=1. urfave/cli isn't that flexible.
|
||||
&cli.IntFlag{
|
||||
Name: "skipHeader",
|
||||
Usage: "Skip the first <n> rows from input data",
|
||||
Destination: &p.SkipHeader,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "max-line-length",
|
||||
Usage: "Specifies the maximum number of bytes that can be read for a single line",
|
||||
Value: 16_000_000,
|
||||
Destination: &p.MaxLineLength,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "xIgnoreDataTypeInColumnName",
|
||||
Usage: "Ignores dataType which could be specified after ':' in column name",
|
||||
Hidden: true,
|
||||
Destination: &p.IgnoreDataTypeInColumnName,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "encoding",
|
||||
Usage: "Character encoding of input files or stdin",
|
||||
Value: "UTF-8",
|
||||
Destination: &p.Encoding,
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "errors-file",
|
||||
Usage: "The path to the file to write rejected rows to",
|
||||
TakesFile: true,
|
||||
Destination: &p.ErrorsFile,
|
||||
},
|
||||
&cli.GenericFlag{
|
||||
Name: "rate-limit",
|
||||
Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`,
|
||||
DefaultText: "no throttling",
|
||||
Value: &p.RateLimit,
|
||||
},
|
||||
&cli.GenericFlag{
|
||||
Name: "compression",
|
||||
Usage: "Input compression, either 'none' or 'gzip'",
|
||||
DefaultText: "'none' unless an input has a '.gz' extension",
|
||||
Value: &p.Compression,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func parseFormat(f string) (linereader.InputFormat, error) {
|
||||
switch f {
|
||||
case "":
|
||||
return linereader.InputFormatDerived, nil
|
||||
case "lp":
|
||||
return linereader.InputFormatLP, nil
|
||||
case "csv":
|
||||
return linereader.InputFormatCSV, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported format: %q", f)
|
||||
}
|
||||
}
|
||||
|
||||
func parseCompression(c string) (linereader.InputCompression, error) {
|
||||
switch c {
|
||||
case "":
|
||||
return linereader.InputCompressionDerived, nil
|
||||
case "none":
|
||||
return linereader.InputCompressionNone, nil
|
||||
case "gzip":
|
||||
return linereader.InputCompressionGZIP, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("unsupported compression: %q", c)
|
||||
func newWriteCmd() *cli.Command {
|
||||
params := writeParams{
|
||||
WriteParams: internal.WriteParams{
|
||||
Precision: api.WRITEPRECISION_NS,
|
||||
},
|
||||
}
|
||||
return &cli.Command{
|
||||
Name: "write",
|
||||
Usage: "Write points to InfluxDB",
|
||||
Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag",
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Flags: append(commonFlagsNoPrint, params.Flags()...),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
errorFile, err := params.makeErrorFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = errorFile.Close() }()
|
||||
|
||||
client := getAPI(ctx)
|
||||
writeClients := &internal.WriteClients{
|
||||
Client: client.WriteApi,
|
||||
Reader: params.makeLineReader(ctx.Args().Slice(), errorFile),
|
||||
Throttler: throttler.NewThrottler(params.RateLimit),
|
||||
Writer: &batcher.BufferBatcher{
|
||||
MaxFlushBytes: batcher.DefaultMaxBytes,
|
||||
MaxFlushInterval: batcher.DefaultInterval,
|
||||
MaxLineLength: params.MaxLineLength,
|
||||
},
|
||||
}
|
||||
|
||||
return getCLI(ctx).Write(ctx.Context, writeClients, ¶ms.WriteParams)
|
||||
},
|
||||
Subcommands: []*cli.Command{
|
||||
newWriteDryRun(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func parsePrecision(p string) (api.WritePrecision, error) {
|
||||
switch p {
|
||||
case "ms":
|
||||
return api.WRITEPRECISION_MS, nil
|
||||
case "s":
|
||||
return api.WRITEPRECISION_S, nil
|
||||
case "us":
|
||||
return api.WRITEPRECISION_US, nil
|
||||
case "ns":
|
||||
return api.WRITEPRECISION_NS, nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported precision: %q", p)
|
||||
func newWriteDryRun() *cli.Command {
|
||||
params := writeParams{
|
||||
WriteParams: internal.WriteParams{
|
||||
Precision: api.WRITEPRECISION_NS,
|
||||
},
|
||||
}
|
||||
|
||||
return &cli.Command{
|
||||
Name: "dryrun",
|
||||
Usage: "Write to stdout instead of InfluxDB",
|
||||
Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol",
|
||||
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
||||
Flags: append(commonFlagsNoPrint, params.Flags()...),
|
||||
Action: func(ctx *cli.Context) error {
|
||||
errorFile, err := params.makeErrorFile()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = errorFile.Close() }()
|
||||
|
||||
return getCLI(ctx).WriteDryRun(ctx.Context, params.makeLineReader(ctx.Args().Slice(), errorFile))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
25
internal/api/write_precision.go
Normal file
25
internal/api/write_precision.go
Normal file
@ -0,0 +1,25 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func (v WritePrecision) String() string {
|
||||
return string(v)
|
||||
}
|
||||
|
||||
func (v *WritePrecision) Set(s string) error {
|
||||
switch s {
|
||||
case "ms":
|
||||
*v = WRITEPRECISION_MS
|
||||
case "s":
|
||||
*v = WRITEPRECISION_S
|
||||
case "us":
|
||||
*v = WRITEPRECISION_US
|
||||
case "ns":
|
||||
*v = WRITEPRECISION_NS
|
||||
default:
|
||||
return fmt.Errorf("unsupported precision: %q", s)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -29,6 +29,33 @@ const (
|
||||
InputFormatLP
|
||||
)
|
||||
|
||||
func (i *InputFormat) Set(v string) error {
|
||||
switch v {
|
||||
case "":
|
||||
*i = InputFormatDerived
|
||||
case "lp":
|
||||
*i = InputFormatLP
|
||||
case "csv":
|
||||
*i = InputFormatCSV
|
||||
default:
|
||||
return fmt.Errorf("unsupported format: %q", v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i InputFormat) String() string {
|
||||
switch i {
|
||||
case InputFormatLP:
|
||||
return "lp"
|
||||
case InputFormatCSV:
|
||||
return "csv"
|
||||
case InputFormatDerived:
|
||||
fallthrough
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
type InputCompression int
|
||||
|
||||
const (
|
||||
@ -37,6 +64,33 @@ const (
|
||||
InputCompressionNone
|
||||
)
|
||||
|
||||
func (i *InputCompression) Set(v string) error {
|
||||
switch v {
|
||||
case "":
|
||||
*i = InputCompressionDerived
|
||||
case "none":
|
||||
*i = InputCompressionNone
|
||||
case "gzip":
|
||||
*i = InputCompressionGZIP
|
||||
default:
|
||||
return fmt.Errorf("unsupported compression: %q", v)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i InputCompression) String() string {
|
||||
switch i {
|
||||
case InputCompressionNone:
|
||||
return "none"
|
||||
case InputCompressionGZIP:
|
||||
return "gzip"
|
||||
case InputCompressionDerived:
|
||||
fallthrough
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
type MultiInputLineReader struct {
|
||||
StdIn io.Reader
|
||||
HttpClient HttpClient
|
||||
|
@ -16,12 +16,8 @@ type Throttler struct {
|
||||
bytesPerSecond float64
|
||||
}
|
||||
|
||||
func NewThrottler(rate string) (*Throttler, error) {
|
||||
bytesPerSec, err := ToBytesPerSecond(rate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Throttler{bytesPerSecond: bytesPerSec}, nil
|
||||
func NewThrottler(bytesPerSec BytesPerSec) *Throttler {
|
||||
return &Throttler{bytesPerSecond: float64(bytesPerSec)}
|
||||
}
|
||||
|
||||
func (t *Throttler) Throttle(ctx context.Context, in io.Reader) io.Reader {
|
||||
@ -41,10 +37,25 @@ var rateLimitRegexp = regexp.MustCompile(`^(\d*\.?\d*)(B|kB|MB)/?(\d*)?(s|sec|m|
|
||||
var bytesUnitMultiplier = map[string]float64{"B": 1, "kB": 1024, "MB": 1_048_576}
|
||||
var timeUnitMultiplier = map[string]float64{"s": 1, "sec": 1, "m": 60, "min": 60}
|
||||
|
||||
type BytesPerSec float64
|
||||
|
||||
func (b *BytesPerSec) Set(v string) (err error) {
|
||||
bps, err := ToBytesPerSecond(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*b = BytesPerSec(bps)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b BytesPerSec) String() string {
|
||||
return strconv.FormatFloat(float64(b), 'E', -1, 64)
|
||||
}
|
||||
|
||||
// ToBytesPerSecond converts rate from string to number. The supplied string
|
||||
// value format must be COUNT(B|kB|MB)/TIME(s|sec|m|min) with / and TIME being optional.
|
||||
// All spaces are ignored, they can help with formatting. Examples: "5 MB / 5 min", 17kbs. 5.1MB5m.
|
||||
func ToBytesPerSecond(rateLimit string) (float64, error) {
|
||||
func ToBytesPerSecond(rateLimit string) (BytesPerSec, error) {
|
||||
// ignore all spaces
|
||||
strVal := strings.ReplaceAll(rateLimit, " ", "")
|
||||
if len(strVal) == 0 {
|
||||
@ -74,5 +85,5 @@ func ToBytesPerSecond(rateLimit string) (float64, error) {
|
||||
time = float64(int64Val)
|
||||
}
|
||||
time = time * timeUnitMultiplier[matches[4]]
|
||||
return bytes / time, nil
|
||||
return BytesPerSec(bytes / time), nil
|
||||
}
|
||||
|
@ -14,8 +14,9 @@ func TestThrottlerPassthrough(t *testing.T) {
|
||||
// Hard to test that rate-limiting actually works, so we just check
|
||||
// that no data is lost.
|
||||
in := "Hello world!"
|
||||
throttler, err := throttler.NewThrottler("1B/s")
|
||||
bps, err := throttler.ToBytesPerSecond("1B/s")
|
||||
require.NoError(t, err)
|
||||
throttler := throttler.NewThrottler(bps)
|
||||
r := throttler.Throttle(context.Background(), strings.NewReader(in))
|
||||
|
||||
out := bytes.Buffer{}
|
||||
@ -72,7 +73,7 @@ func TestToBytesPerSecond(t *testing.T) {
|
||||
t.Run(test.in, func(t *testing.T) {
|
||||
bytesPerSec, err := throttler.ToBytesPerSecond(test.in)
|
||||
if len(test.error) == 0 {
|
||||
require.Equal(t, test.out, bytesPerSec)
|
||||
require.Equal(t, test.out, float64(bytesPerSec))
|
||||
require.Nil(t, err)
|
||||
} else {
|
||||
require.NotNil(t, err)
|
||||
|
20
pkg/cli/middleware/middleware.go
Normal file
20
pkg/cli/middleware/middleware.go
Normal file
@ -0,0 +1,20 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// WithBeforeFns returns a cli.BeforeFunc that calls each of the provided
|
||||
// functions in order.
|
||||
// NOTE: The first function to return an error will end execution and
|
||||
// be returned as the error value of the composed function.
|
||||
func WithBeforeFns(fns ...cli.BeforeFunc) cli.BeforeFunc {
|
||||
return func(ctx *cli.Context) error {
|
||||
for _, fn := range fns {
|
||||
if err := fn(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
132
pkg/influxid/id.go
Normal file
132
pkg/influxid/id.go
Normal file
@ -0,0 +1,132 @@
|
||||
package influxid
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// IDLength is the exact length a string (or a byte slice representing it) must have in order to be decoded into a valid ID.
|
||||
const IDLength = 16
|
||||
|
||||
var (
|
||||
// ErrInvalidID signifies invalid IDs.
|
||||
ErrInvalidID = errors.New("invalid ID")
|
||||
|
||||
// ErrInvalidIDLength is returned when an ID has the incorrect number of bytes.
|
||||
ErrInvalidIDLength = errors.New("id must have a length of 16 bytes")
|
||||
)
|
||||
|
||||
// ID is a unique identifier.
|
||||
//
|
||||
// Its zero value is not a valid ID.
|
||||
type ID uint64
|
||||
|
||||
// IDFromString creates an ID from a given string.
|
||||
//
|
||||
// It errors if the input string does not match a valid ID.
|
||||
func IDFromString(str string) (ID, error) {
|
||||
var id ID
|
||||
err := id.DecodeFromString(str)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// MustIDFromString is like IDFromString but panics if
|
||||
// s is not a valid base-16 identifier.
|
||||
func MustIDFromString(s string) ID {
|
||||
id, err := IDFromString(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// Decode parses b as a hex-encoded byte-slice-string.
|
||||
//
|
||||
// It errors if the input byte slice does not have the correct length
|
||||
// or if it contains all zeros.
|
||||
func (i *ID) Decode(b []byte) error {
|
||||
if len(b) != IDLength {
|
||||
return ErrInvalidIDLength
|
||||
}
|
||||
|
||||
res, err := strconv.ParseUint(string(b), 16, 64)
|
||||
if err != nil {
|
||||
return ErrInvalidID
|
||||
}
|
||||
|
||||
if *i = ID(res); !i.Valid() {
|
||||
return ErrInvalidID
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DecodeFromString parses s as a hex-encoded string.
|
||||
func (i *ID) DecodeFromString(s string) error {
|
||||
return i.Decode([]byte(s))
|
||||
}
|
||||
|
||||
// Encode converts ID to a hex-encoded byte-slice-string.
|
||||
//
|
||||
// It errors if the receiving ID holds its zero value.
|
||||
func (i ID) Encode() ([]byte, error) {
|
||||
if !i.Valid() {
|
||||
return nil, ErrInvalidID
|
||||
}
|
||||
|
||||
b := make([]byte, hex.DecodedLen(IDLength))
|
||||
binary.BigEndian.PutUint64(b, uint64(i))
|
||||
|
||||
dst := make([]byte, hex.EncodedLen(len(b)))
|
||||
hex.Encode(dst, b)
|
||||
return dst, nil
|
||||
}
|
||||
|
||||
// Valid checks whether the receiving ID is a valid one or not.
|
||||
func (i ID) Valid() bool {
|
||||
return i != 0
|
||||
}
|
||||
|
||||
// String returns the ID as a hex encoded string.
|
||||
//
|
||||
// Returns an empty string in the case the ID is invalid.
|
||||
func (i ID) String() string {
|
||||
enc, _ := i.Encode()
|
||||
return string(enc)
|
||||
}
|
||||
|
||||
// GoString formats the ID the same as the String method.
|
||||
// Without this, when using the %#v verb, an ID would be printed as a uint64,
|
||||
// so you would see e.g. 0x2def021097c6000 instead of 02def021097c6000
|
||||
// (note the leading 0x, which means the former doesn't show up in searches for the latter).
|
||||
func (i ID) GoString() string {
|
||||
return `"` + i.String() + `"`
|
||||
}
|
||||
|
||||
// MarshalText encodes i as text.
|
||||
// Providing this method is a fallback for json.Marshal,
|
||||
// with the added benefit that IDs encoded as map keys will be the expected string encoding,
|
||||
// rather than the effective fmt.Sprintf("%d", i) that json.Marshal uses by default for integer types.
|
||||
func (i ID) MarshalText() ([]byte, error) {
|
||||
return i.Encode()
|
||||
}
|
||||
|
||||
// UnmarshalText decodes i from a byte slice.
|
||||
// Providing this method is also a fallback for json.Unmarshal,
|
||||
// also relevant when IDs are used as map keys.
|
||||
func (i *ID) UnmarshalText(b []byte) error {
|
||||
return i.Decode(b)
|
||||
}
|
||||
|
||||
func (i *ID) Set(s string) error {
|
||||
id, err := IDFromString(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*i = id
|
||||
return nil
|
||||
}
|
223
pkg/influxid/id_test.go
Normal file
223
pkg/influxid/id_test.go
Normal file
@ -0,0 +1,223 @@
|
||||
package influxid_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influx-cli/v2/pkg/influxid"
|
||||
)
|
||||
|
||||
func TestIDFromString(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
id string
|
||||
want influxid.ID
|
||||
wantErr bool
|
||||
err string
|
||||
}{
|
||||
{
|
||||
name: "Should be able to decode an all zeros ID",
|
||||
id: "0000000000000000",
|
||||
wantErr: true,
|
||||
err: influxid.ErrInvalidID.Error(),
|
||||
},
|
||||
{
|
||||
name: "Should be able to decode an all f ID",
|
||||
id: "ffffffffffffffff",
|
||||
want: influxid.MustIDFromString("ffffffffffffffff"),
|
||||
},
|
||||
{
|
||||
name: "Should be able to decode an ID",
|
||||
id: "020f755c3c082000",
|
||||
want: influxid.MustIDFromString("020f755c3c082000"),
|
||||
},
|
||||
{
|
||||
name: "Should not be able to decode a non hex ID",
|
||||
id: "gggggggggggggggg",
|
||||
wantErr: true,
|
||||
err: influxid.ErrInvalidID.Error(),
|
||||
},
|
||||
{
|
||||
name: "Should not be able to decode inputs with length less than 16 bytes",
|
||||
id: "abc",
|
||||
wantErr: true,
|
||||
err: influxid.ErrInvalidIDLength.Error(),
|
||||
},
|
||||
{
|
||||
name: "Should not be able to decode inputs with length greater than 16 bytes",
|
||||
id: "abcdabcdabcdabcd0",
|
||||
wantErr: true,
|
||||
err: influxid.ErrInvalidIDLength.Error(),
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := influxid.IDFromString(tt.id)
|
||||
|
||||
// Check negative test cases
|
||||
if (err != nil) && tt.wantErr {
|
||||
if tt.err != err.Error() {
|
||||
t.Errorf("IDFromString() errors out \"%s\", want \"%s\"", err, tt.err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Check positive test cases
|
||||
if !reflect.DeepEqual(got, tt.want) && !tt.wantErr {
|
||||
t.Errorf("IDFromString() outputs %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFromString(t *testing.T) {
|
||||
var id influxid.ID
|
||||
err := id.DecodeFromString("020f755c3c082000")
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
want := []byte{48, 50, 48, 102, 55, 53, 53, 99, 51, 99, 48, 56, 50, 48, 48, 48}
|
||||
got, _ := id.Encode()
|
||||
if !bytes.Equal(want, got) {
|
||||
t.Errorf("got %s not equal to wanted %s", string(got), string(want))
|
||||
}
|
||||
if id.String() != "020f755c3c082000" {
|
||||
t.Errorf("expecting string representation to contain the right value")
|
||||
}
|
||||
if !id.Valid() {
|
||||
t.Errorf("expecting ID to be a valid one")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEncode(t *testing.T) {
|
||||
var id influxid.ID
|
||||
if _, err := id.Encode(); err == nil {
|
||||
t.Errorf("encoding an invalid ID should not be possible")
|
||||
}
|
||||
|
||||
id.DecodeFromString("5ca1ab1eba5eba11")
|
||||
want := []byte{53, 99, 97, 49, 97, 98, 49, 101, 98, 97, 53, 101, 98, 97, 49, 49}
|
||||
got, _ := id.Encode()
|
||||
if !bytes.Equal(want, got) {
|
||||
t.Errorf("encoding error")
|
||||
}
|
||||
if id.String() != "5ca1ab1eba5eba11" {
|
||||
t.Errorf("expecting string representation to contain the right value")
|
||||
}
|
||||
if !id.Valid() {
|
||||
t.Errorf("expecting ID to be a valid one")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFromAllZeros(t *testing.T) {
|
||||
var id influxid.ID
|
||||
err := id.Decode(make([]byte, influxid.IDLength))
|
||||
if err == nil {
|
||||
t.Errorf("expecting all zeros ID to not be a valid ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFromShorterString(t *testing.T) {
|
||||
var id influxid.ID
|
||||
err := id.DecodeFromString("020f75")
|
||||
if err == nil {
|
||||
t.Errorf("expecting shorter inputs to error")
|
||||
}
|
||||
if id.String() != "" {
|
||||
t.Errorf("expecting invalid ID to be serialized into empty string")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFromLongerString(t *testing.T) {
|
||||
var id influxid.ID
|
||||
err := id.DecodeFromString("020f755c3c082000aaa")
|
||||
if err == nil {
|
||||
t.Errorf("expecting shorter inputs to error")
|
||||
}
|
||||
if id.String() != "" {
|
||||
t.Errorf("expecting invalid ID to be serialized into empty string")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDecodeFromEmptyString(t *testing.T) {
|
||||
var id influxid.ID
|
||||
err := id.DecodeFromString("")
|
||||
if err == nil {
|
||||
t.Errorf("expecting empty inputs to error")
|
||||
}
|
||||
if id.String() != "" {
|
||||
t.Errorf("expecting invalid ID to be serialized into empty string")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarshalling(t *testing.T) {
|
||||
var id0 influxid.ID
|
||||
_, err := json.Marshal(id0)
|
||||
if err == nil {
|
||||
t.Errorf("expecting empty ID to not be a valid one")
|
||||
}
|
||||
|
||||
init := "ca55e77eca55e77e"
|
||||
id1, err := influxid.IDFromString(init)
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
|
||||
serialized, err := json.Marshal(id1)
|
||||
if err != nil {
|
||||
t.Errorf(err.Error())
|
||||
}
|
||||
|
||||
var id2 influxid.ID
|
||||
json.Unmarshal(serialized, &id2)
|
||||
|
||||
bytes1, _ := id1.Encode()
|
||||
bytes2, _ := id2.Encode()
|
||||
|
||||
if !bytes.Equal(bytes1, bytes2) {
|
||||
t.Errorf("error marshalling/unmarshalling ID")
|
||||
}
|
||||
|
||||
// When used as a map key, IDs must use their string encoding.
|
||||
// If you only implement json.Marshaller, they will be encoded with Go's default integer encoding.
|
||||
b, err := json.Marshal(map[influxid.ID]int{0x1234: 5678})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
const exp = `{"0000000000001234":5678}`
|
||||
if string(b) != exp {
|
||||
t.Errorf("expected map to json.Marshal as %s; got %s", exp, string(b))
|
||||
}
|
||||
|
||||
var idMap map[influxid.ID]int
|
||||
if err := json.Unmarshal(b, &idMap); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if len(idMap) != 1 {
|
||||
t.Errorf("expected length 1, got %d", len(idMap))
|
||||
}
|
||||
if idMap[0x1234] != 5678 {
|
||||
t.Errorf("unmarshalled incorrectly; exp 0x1234:5678, got %v", idMap)
|
||||
}
|
||||
}
|
||||
|
||||
func TestID_GoString(t *testing.T) {
|
||||
type idGoStringTester struct {
|
||||
ID influxid.ID
|
||||
}
|
||||
var x idGoStringTester
|
||||
|
||||
const idString = "02def021097c6000"
|
||||
if err := x.ID.DecodeFromString(idString); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sharpV := fmt.Sprintf("%#v", x)
|
||||
want := `influxid_test.idGoStringTester{ID:"` + idString + `"}`
|
||||
if sharpV != want {
|
||||
t.Fatalf("bad GoString: got %q, want %q", sharpV, want)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user