
When passed `--errors-file`, the file should not need to already exist on disk to be written into
245 lines
6.8 KiB
Go
245 lines
6.8 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
|
|
"github.com/influxdata/influx-cli/v2/api"
|
|
"github.com/influxdata/influx-cli/v2/clients/write"
|
|
"github.com/influxdata/influx-cli/v2/pkg/cli/middleware"
|
|
"github.com/urfave/cli"
|
|
)
|
|
|
|
type writeParams struct {
|
|
Files cli.StringSlice
|
|
URLs cli.StringSlice
|
|
Format write.InputFormat
|
|
Compression write.InputCompression
|
|
Encoding string
|
|
|
|
// CSV-specific options.
|
|
Headers cli.StringSlice
|
|
SkipRowOnError bool
|
|
SkipHeader int
|
|
IgnoreDataTypeInColumnName bool
|
|
Debug bool
|
|
|
|
ErrorsFile string
|
|
MaxLineLength int
|
|
RateLimit write.BytesPerSec
|
|
|
|
write.Params
|
|
}
|
|
|
|
func (p *writeParams) makeLineReader(args []string, errorOut io.Writer) *write.MultiInputLineReader {
|
|
return &write.MultiInputLineReader{
|
|
StdIn: os.Stdin,
|
|
HttpClient: http.DefaultClient,
|
|
ErrorOut: errorOut,
|
|
Args: args,
|
|
Files: p.Files.Value(),
|
|
URLs: p.URLs.Value(),
|
|
Format: p.Format,
|
|
Compression: p.Compression,
|
|
Encoding: p.Encoding,
|
|
Headers: p.Headers.Value(),
|
|
SkipRowOnError: p.SkipRowOnError,
|
|
SkipHeader: p.SkipHeader,
|
|
IgnoreDataTypeInColumnName: p.IgnoreDataTypeInColumnName,
|
|
Debug: p.Debug,
|
|
}
|
|
}
|
|
|
|
func (p *writeParams) makeErrorFile() (*os.File, error) {
|
|
if p.ErrorsFile == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
_, err := os.Stat(p.ErrorsFile)
|
|
if os.IsNotExist(err) {
|
|
file, err := os.Create(p.ErrorsFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create errors-file: %w", err)
|
|
}
|
|
return file, nil
|
|
}
|
|
|
|
errorFile, err := os.Open(p.ErrorsFile)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open errors-file: %w", err)
|
|
}
|
|
return errorFile, nil
|
|
}
|
|
|
|
func (p *writeParams) Flags() []cli.Flag {
|
|
return append(getOrgFlags(&p.OrgParams), []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "bucket-id",
|
|
Usage: "The ID of destination bucket",
|
|
EnvVar: "INFLUX_BUCKET_ID",
|
|
Destination: &p.BucketID,
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "bucket, b",
|
|
Usage: "The name of destination bucket",
|
|
EnvVar: "INFLUX_BUCKET_NAME",
|
|
Destination: &p.BucketName,
|
|
},
|
|
&cli.GenericFlag{
|
|
Name: "precision, p",
|
|
Usage: "Precision of the timestamps of the lines",
|
|
EnvVar: "INFLUX_PRECISION",
|
|
Value: &p.Precision,
|
|
},
|
|
&cli.GenericFlag{
|
|
Name: "format",
|
|
Usage: "Input format, either 'lp' (Line Protocol) or 'csv' (Comma Separated Values)",
|
|
Value: &p.Format,
|
|
},
|
|
&cli.StringSliceFlag{
|
|
Name: "header",
|
|
Usage: "Header prepends lines to input data",
|
|
Value: &p.Headers,
|
|
},
|
|
&cli.StringSliceFlag{
|
|
Name: "file, f",
|
|
Usage: "The path to the file to import",
|
|
TakesFile: true,
|
|
Value: &p.Files,
|
|
},
|
|
&cli.StringSliceFlag{
|
|
Name: "url, u",
|
|
Usage: "The URL to import data from",
|
|
Value: &p.URLs,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "debug",
|
|
Usage: "Log CSV columns to stderr before reading data rows",
|
|
Destination: &p.Debug,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "skipRowOnError",
|
|
Usage: "Log CSV data errors to stderr and continue with CSV processing",
|
|
Destination: &p.SkipRowOnError,
|
|
},
|
|
// NOTE: The old CLI allowed this flag to be used as an int _or_ a bool, with the bool form being
|
|
// short-hand for N=1. urfave/cli isn't that flexible.
|
|
&cli.IntFlag{
|
|
Name: "skipHeader",
|
|
Usage: "Skip the first <n> rows from input data",
|
|
Destination: &p.SkipHeader,
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "max-line-length",
|
|
Usage: "Specifies the maximum number of bytes that can be read for a single line",
|
|
Value: 16_000_000,
|
|
Destination: &p.MaxLineLength,
|
|
},
|
|
&cli.BoolFlag{
|
|
Name: "xIgnoreDataTypeInColumnName",
|
|
Usage: "Ignores dataType which could be specified after ':' in column name",
|
|
Hidden: true,
|
|
Destination: &p.IgnoreDataTypeInColumnName,
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "encoding",
|
|
Usage: "Character encoding of input files or stdin",
|
|
Value: "UTF-8",
|
|
Destination: &p.Encoding,
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "errors-file",
|
|
Usage: "The path to the file to write rejected rows to",
|
|
TakesFile: true,
|
|
Destination: &p.ErrorsFile,
|
|
},
|
|
&cli.GenericFlag{
|
|
Name: "rate-limit",
|
|
Usage: `Throttles write, examples: "5 MB / 5 min" , "17kBs"`,
|
|
Value: &p.RateLimit,
|
|
},
|
|
&cli.GenericFlag{
|
|
Name: "compression",
|
|
Usage: "Input compression, either 'none' or 'gzip'",
|
|
Value: &p.Compression,
|
|
},
|
|
}...)
|
|
}
|
|
|
|
func newWriteCmd() cli.Command {
|
|
params := writeParams{
|
|
Params: write.Params{
|
|
Precision: api.WRITEPRECISION_NS,
|
|
},
|
|
}
|
|
return cli.Command{
|
|
Name: "write",
|
|
Usage: "Write points to InfluxDB",
|
|
Description: "Write data to InfluxDB via stdin, or add an entire file specified with the -f flag",
|
|
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
|
Flags: append(commonFlagsNoPrint(), params.Flags()...),
|
|
Action: func(ctx *cli.Context) error {
|
|
if err := checkOrgFlags(¶ms.OrgParams); err != nil {
|
|
return err
|
|
}
|
|
errorFile, err := params.makeErrorFile()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = errorFile.Close() }()
|
|
|
|
client := &write.Client{
|
|
CLI: getCLI(ctx),
|
|
WriteApi: getAPI(ctx).WriteApi,
|
|
LineReader: params.makeLineReader(ctx.Args(), errorFile),
|
|
RateLimiter: write.NewThrottler(params.RateLimit),
|
|
BatchWriter: &write.BufferBatcher{
|
|
MaxFlushBytes: write.DefaultMaxBytes,
|
|
MaxFlushInterval: write.DefaultInterval,
|
|
MaxLineLength: params.MaxLineLength,
|
|
},
|
|
}
|
|
|
|
return client.Write(getContext(ctx), ¶ms.Params)
|
|
},
|
|
Subcommands: []cli.Command{
|
|
newWriteDryRun(),
|
|
},
|
|
}
|
|
}
|
|
|
|
func newWriteDryRun() cli.Command {
|
|
params := writeParams{
|
|
Params: write.Params{
|
|
Precision: api.WRITEPRECISION_NS,
|
|
},
|
|
}
|
|
|
|
return cli.Command{
|
|
Name: "dryrun",
|
|
Usage: "Write to stdout instead of InfluxDB",
|
|
Description: "Write protocol lines to stdout instead of InfluxDB. Troubleshoot conversion from CSV to line protocol",
|
|
Before: middleware.WithBeforeFns(withCli(), withApi(true)),
|
|
Flags: append(commonFlagsNoPrint(), params.Flags()...),
|
|
Action: func(ctx *cli.Context) error {
|
|
if err := checkOrgFlags(¶ms.OrgParams); err != nil {
|
|
return err
|
|
}
|
|
errorFile, err := params.makeErrorFile()
|
|
fmt.Println(params.OrgBucketParams.OrgName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() { _ = errorFile.Close() }()
|
|
|
|
client := write.DryRunClient{
|
|
CLI: getCLI(ctx),
|
|
LineReader: params.makeLineReader(ctx.Args(), errorFile),
|
|
}
|
|
return client.WriteDryRun(getContext(ctx))
|
|
},
|
|
}
|
|
}
|