Files
influx-cli/cmd/influx/write.go
Sam Arnold 9747d05ae1 refactor: expose generated code and client business logic to share with Kapacitor (#103)
* refactor: take clients out of internal

* refactor: move stdio to pkg

* Move internal/api to api

* refactor: final changes for Kapacitor to access shared functionality

* chore: regenerate mocks

* fix: bad automated refactor

* chore: extra formatting not caught by make fmt
2021-05-25 10:05:01 -04:00

248 lines
7.1 KiB
Go

package main
import (
"fmt"
"io"
"net/http"
"os"
"github.com/influxdata/influx-cli/v2/api"
"github.com/influxdata/influx-cli/v2/clients/write"
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
"github.com/urfave/cli/v2"
)
type writeParams struct {
Files cli.StringSlice
URLs cli.StringSlice
Format write.InputFormat
Compression write.InputCompression
Encoding string
// CSV-specific options.
Headers cli.StringSlice
SkipRowOnError bool
SkipHeader int
IgnoreDataTypeInColumnName bool
Debug bool
ErrorsFile string
MaxLineLength int
RateLimit write.BytesPerSec
write.Params
}
func (p *writeParams) makeLineReader(args []string, errorOut io.Writer) *write.MultiInputLineReader {
return &write.MultiInputLineReader{
StdIn: os.Stdin,
HttpClient: http.DefaultClient,
ErrorOut: errorOut,
Args: args,
Files: p.Files.Value(),
URLs: p.URLs.Value(),
Format: p.Format,
Compression: p.Compression,
Encoding: p.Encoding,
Headers: p.Headers.Value(),
SkipRowOnError: p.SkipRowOnError,
SkipHeader: p.SkipHeader,
IgnoreDataTypeInColumnName: p.IgnoreDataTypeInColumnName,
Debug: p.Debug,
}
}
func (p *writeParams) makeErrorFile() (*os.File, error) {
if p.ErrorsFile == "" {
return nil, nil
}
errorFile, err := os.Open(p.ErrorsFile)
if err != nil {
return nil, fmt.Errorf("failed to open errors-file: %w", err)
}
return errorFile, nil
}
func (p *writeParams) Flags() []cli.Flag {
return []cli.Flag{
&cli.StringFlag{
Name: "bucket-id",
Usage: "The ID of destination bucket",
EnvVars: []string{"INFLUX_BUCKET_ID"},
Destination: &p.BucketID,
},
&cli.StringFlag{
Name: "bucket",
Usage: "The name of destination bucket",
Aliases: []string{"b"},
EnvVars: []string{"INFLUX_BUCKET_NAME"},
Destination: &p.BucketName,
},
&cli.StringFlag{
Name: "org-id",
Usage: "The ID of the organization",
EnvVars: []string{"INFLUX_ORG_ID"},
Destination: &p.OrgID,
},
&cli.StringFlag{
Name: "org",
Usage: "The name of the organization",
Aliases: []string{"o"},
EnvVars: []string{"INFLUX_ORG"},
Destination: &p.OrgName,
},
&cli.GenericFlag{
Name: "precision",
Usage: "Precision of the timestamps of the lines",
Aliases: []string{"p"},
EnvVars: []string{"INFLUX_PRECISION"},
Value: &p.Precision,
},
&cli.GenericFlag{
Name: "format",
Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)",
DefaultText: "'lp' unless '.csv' extension",
Value: &p.Format,
},
&cli.StringSliceFlag{
Name: "header",
Usage: "Header prepends lines to input data",
Destination: &p.Headers,
},
&cli.StringSliceFlag{
Name: "file",
Usage: "The path to the file to import",
Aliases: []string{"f"},
TakesFile: true,
Destination: &p.Files,
},
&cli.StringSliceFlag{
Name: "url",
Usage: "The URL to import data from",
Aliases: []string{"u"},
Destination: &p.URLs,
},
&cli.BoolFlag{
Name: "debug",
Usage: "Log CSV columns to stderr before reading data rows",
Destination: &p.Debug,
},
&cli.BoolFlag{
Name: "skipRowOnError",
Usage: "Log CSV data errors to stderr and continue with CSV processing",
Destination: &p.SkipRowOnError,
},
// NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being
// short-hand for N=1. urfave/cli isn't that flexible.
&cli.IntFlag{
Name: "skipHeader",
Usage: "Skip the first <n> rows from input data",
Destination: &p.SkipHeader,
},
&cli.IntFlag{
Name: "max-line-length",
Usage: "Specifies the maximum number of bytes that can be read for a single line",
Value: 16_000_000,
Destination: &p.MaxLineLength,
},
&cli.BoolFlag{
Name: "xIgnoreDataTypeInColumnName",
Usage: "Ignores dataType which could be specified after ':' in column name",
Hidden: true,
Destination: &p.IgnoreDataTypeInColumnName,
},
&cli.StringFlag{
Name: "encoding",
Usage: "Character encoding of input files or stdin",
Value: "UTF-8",
Destination: &p.Encoding,
},
&cli.StringFlag{
Name: "errors-file",
Usage: "The path to the file to write rejected rows to",
TakesFile: true,
Destination: &p.ErrorsFile,
},
&cli.GenericFlag{
Name: "rate-limit",
Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`,
DefaultText: "no throttling",
Value: &p.RateLimit,
},
&cli.GenericFlag{
Name: "compression",
Usage: "Input compression, either 'none' or 'gzip'",
DefaultText: "'none' unless an input has a '.gz' extension",
Value: &p.Compression,
},
}
}
func newWriteCmd() *cli.Command {
params := writeParams{
Params: write.Params{
Precision: api.WRITEPRECISION_NS,
},
}
return &cli.Command{
Name: "write",
Usage: "Write points to InfluxDB",
Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag",
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
Flags: append(commonFlagsNoPrint(), params.Flags()...),
Action: func(ctx *cli.Context) error {
errorFile, err := params.makeErrorFile()
if err != nil {
return err
}
defer func() { _ = errorFile.Close() }()
client := &write.Client{
CLI: getCLI(ctx),
WriteApi: getAPI(ctx).WriteApi,
LineReader: params.makeLineReader(ctx.Args().Slice(), errorFile),
RateLimiter: write.NewThrottler(params.RateLimit),
BatchWriter: &write.BufferBatcher{
MaxFlushBytes: write.DefaultMaxBytes,
MaxFlushInterval: write.DefaultInterval,
MaxLineLength: params.MaxLineLength,
},
}
return client.Write(ctx.Context, &params.Params)
},
Subcommands: []*cli.Command{
newWriteDryRun(),
},
}
}
func newWriteDryRun() *cli.Command {
params := writeParams{
Params: write.Params{
Precision: api.WRITEPRECISION_NS,
},
}
return &cli.Command{
Name: "dryrun",
Usage: "Write to stdout instead of InfluxDB",
Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol",
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
Flags: append(commonFlagsNoPrint(), params.Flags()...),
Action: func(ctx *cli.Context) error {
errorFile, err := params.makeErrorFile()
if err != nil {
return err
}
defer func() { _ = errorFile.Close() }()
client := write.DryRunClient{
CLI: getCLI(ctx),
LineReader: params.makeLineReader(ctx.Args().Slice(), errorFile),
}
return client.WriteDryRun(ctx.Context)
},
}
}